import time import os from typing import Dict, Any, Optional, List, Tuple import random class IPStrategyConfig: def __init__(self, response_time_threshold_sec: int = 5, proxy_failure_threshold: int = 3, local_cooldown_sec: int = 1800, local_failure_threshold: int = 2): """IP策略配置 Args: response_time_threshold_sec (int): 单次请求耗时阈值秒。 proxy_failure_threshold (int): 同一代理连续失败触发切换阈值。 local_cooldown_sec (int): 本机IP使用冷却时间秒。 local_failure_threshold (int): 本机连续失败阈值,超过后回到代理池。 Returns: None """ self.response_time_threshold_sec = response_time_threshold_sec self.proxy_failure_threshold = proxy_failure_threshold self.local_cooldown_sec = local_cooldown_sec self.local_failure_threshold = local_failure_threshold def update(self, updates: Dict[str, Any]) -> None: """动态更新配置""" for k, v in updates.items(): if hasattr(self, k): setattr(self, k, v) class IPAnomalyDetector: def __init__(self, cfg: IPStrategyConfig): """异常检测器 Args: cfg (IPStrategyConfig): 策略配置。 Returns: None """ self.cfg = cfg def detect(self, status_code: Optional[int], elapsed_sec: float, resp_json: Optional[Dict], error_text: str = "") -> Optional[str]: """检测是否存在IP异常 Args: status_code (Optional[int]): 响应HTTP状态码;异常时可能为None。 elapsed_sec (float): 响应耗时秒。 resp_json (Optional[Dict]): 响应体JSON。 error_text (str): 异常文本。 Returns: Optional[str]: 异常原因标识字符串;无异常返回None。 """ if status_code in (403, 429, 407): return f"http_{status_code}" if elapsed_sec > self.cfg.response_time_threshold_sec: return "slow_response" if resp_json: msg = str(resp_json.get("message", "")) code = resp_json.get("code") if code == 35 or ("IP地址存在异常" in msg or ("IP" in msg and "异常" in msg)): return "ip_banned" if error_text and ("IP" in error_text and "异常" in error_text): return "ip_banned" return None class SmartIPManager: def __init__(self, proxy_pool: Optional[List[Dict[str, str]]], cfg: IPStrategyConfig): """智能IP管理器 Args: proxy_pool (Optional[List[Dict[str,str]]]): 代理池列表,元素为requests兼容代理字典。 cfg (IPStrategyConfig): 策略配置。 Returns: None """ self.cfg = cfg self.proxy_pool: List[Dict[str, str]] = proxy_pool or [] self.eliminated: set = set() self.current_mode: str = 'proxy' if self.proxy_pool else 'local' self.current_index: int = 0 self.proxy_failures_current: int = 0 self.local_failures: int = 0 self.last_local_use_time: float = 0.0 self.local_disabled_until: float = 0.0 def current_route(self) -> Tuple[str, Optional[Dict[str, str]]]: """返回当前路由模式和代理配置""" if self.current_mode == 'proxy' and self.proxy_pool: return 'proxy', self.proxy_pool[self.current_index] return 'local', None def mark_success(self) -> None: """请求成功后重置失败计数""" if self.current_mode == 'proxy': self.proxy_failures_current = 0 else: self.local_failures = 0 def mark_failure(self, reason: str = "") -> None: """请求失败后更新失败计数与淘汰状态""" if self.current_mode == 'proxy': self.proxy_failures_current += 1 if self.proxy_failures_current >= self.cfg.proxy_failure_threshold: self.eliminated.add(self.current_index) else: self.local_failures += 1 def select_next_route(self) -> Tuple[str, Optional[Dict[str, str]]]: """选择下一个路由(代理或本机),避免无限本机循环""" now = time.monotonic() if self.current_mode == 'proxy': if self.proxy_failures_current >= self.cfg.proxy_failure_threshold: if self._local_available(now): self.current_mode = 'local' self.last_local_use_time = now self.proxy_failures_current = 0 return 'local', None next_idx = self._next_proxy_index() if next_idx is not None: self.current_index = next_idx self.proxy_failures_current = 0 return 'proxy', self.proxy_pool[self.current_index] self.current_mode = 'local' self.last_local_use_time = now self.proxy_failures_current = 0 return 'local', None if self.proxy_pool: return 'proxy', self.proxy_pool[self.current_index] self.current_mode = 'local' return 'local', None else: if self.local_failures >= self.cfg.local_failure_threshold: next_idx = self._next_proxy_index() if next_idx is not None: self.current_mode = 'proxy' self.current_index = next_idx self.local_failures = 0 return 'proxy', self.proxy_pool[self.current_index] return 'local', None def _next_proxy_index(self) -> Optional[int]: """查找下一个未被淘汰的代理索引""" if not self.proxy_pool: return None n = len(self.proxy_pool) for step in range(1, n + 1): cand = (self.current_index + step) % n if cand not in self.eliminated: return cand return None def _local_available(self, now: float) -> bool: """本机是否可用(冷却与禁用窗口判断)""" if now < self.local_disabled_until: return False return (now - self.last_local_use_time) >= self.cfg.local_cooldown_sec def disable_local_temporarily(self, seconds: int) -> None: """临时禁用本机IP""" self.local_disabled_until = time.monotonic() + max(0, seconds) def manual_switch_to_proxy(self, index: int) -> None: """人工指定代理索引""" if 0 <= index < len(self.proxy_pool) and index not in self.eliminated: self.current_mode = 'proxy' self.current_index = index self.proxy_failures_current = 0 def enable_local(self) -> None: """重新允许本机IP""" self.local_disabled_until = 0.0 def generate_boss_trace_id() -> str: """生成Boss直聘的trace_id 基于Boss直聘官方算法: 1. 获取当前时间戳的16进制表示,取后6位 2. 生成10位随机字符串(包含数字、小写字母、大写字母) 3. 拼接为 F-{timestamp_hex}{random_string} 格式 """ # 获取当前时间戳的16进制表示,取后6位 timestamp_hex = hex(int(time.time() * 1000))[2:][-6:] # 字符集:数字 + 小写字母 + 大写字母 charset = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" # 生成10位随机字符串 random_string = ''.join(random.choice(charset) for _ in range(10)) # 拼接最终的traceid trace_id = f"F-{timestamp_hex}{random_string}" return trace_id def generate_token() -> str: chars = "0123456789abcdef" return ''.join(random.choice(chars) for _ in range(32))