203 lines
7.7 KiB
Python
203 lines
7.7 KiB
Python
import time
|
||
from typing import Dict, Any, Optional, List, Tuple
|
||
import random
|
||
|
||
class IPStrategyConfig:
|
||
def __init__(self,
|
||
response_time_threshold_sec: int = 5,
|
||
proxy_failure_threshold: int = 3,
|
||
local_cooldown_sec: int = 1800,
|
||
local_failure_threshold: int = 2):
|
||
"""IP策略配置
|
||
|
||
Args:
|
||
response_time_threshold_sec (int): 单次请求耗时阈值秒。
|
||
proxy_failure_threshold (int): 同一代理连续失败触发切换阈值。
|
||
local_cooldown_sec (int): 本机IP使用冷却时间秒。
|
||
local_failure_threshold (int): 本机连续失败阈值,超过后回到代理池。
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
self.response_time_threshold_sec = response_time_threshold_sec
|
||
self.proxy_failure_threshold = proxy_failure_threshold
|
||
self.local_cooldown_sec = local_cooldown_sec
|
||
self.local_failure_threshold = local_failure_threshold
|
||
|
||
def update(self, updates: Dict[str, Any]) -> None:
|
||
"""动态更新配置"""
|
||
for k, v in updates.items():
|
||
if hasattr(self, k):
|
||
setattr(self, k, v)
|
||
|
||
|
||
class IPAnomalyDetector:
|
||
def __init__(self, cfg: IPStrategyConfig):
|
||
"""异常检测器
|
||
|
||
Args:
|
||
cfg (IPStrategyConfig): 策略配置。
|
||
Returns:
|
||
None
|
||
"""
|
||
self.cfg = cfg
|
||
|
||
def detect(self, status_code: Optional[int], elapsed_sec: float, resp_json: Optional[Dict], error_text: str = "") -> Optional[str]:
|
||
"""检测是否存在IP异常
|
||
|
||
Args:
|
||
status_code (Optional[int]): 响应HTTP状态码;异常时可能为None。
|
||
elapsed_sec (float): 响应耗时秒。
|
||
resp_json (Optional[Dict]): 响应体JSON。
|
||
error_text (str): 异常文本。
|
||
|
||
Returns:
|
||
Optional[str]: 异常原因标识字符串;无异常返回None。
|
||
"""
|
||
if status_code in (403, 429, 407):
|
||
return f"http_{status_code}"
|
||
if elapsed_sec > self.cfg.response_time_threshold_sec:
|
||
return "slow_response"
|
||
if resp_json:
|
||
msg = str(resp_json.get("message", ""))
|
||
code = resp_json.get("code")
|
||
if code == 35 or ("IP地址存在异常" in msg or ("IP" in msg and "异常" in msg)):
|
||
return "ip_banned"
|
||
if error_text and ("IP" in error_text and "异常" in error_text):
|
||
return "ip_banned"
|
||
return None
|
||
|
||
|
||
class SmartIPManager:
|
||
def __init__(self, proxy_pool: Optional[List[Dict[str, str]]], cfg: IPStrategyConfig):
|
||
"""智能IP管理器
|
||
|
||
Args:
|
||
proxy_pool (Optional[List[Dict[str,str]]]): 代理池列表,元素为requests兼容代理字典。
|
||
cfg (IPStrategyConfig): 策略配置。
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
self.cfg = cfg
|
||
self.proxy_pool: List[Dict[str, str]] = proxy_pool or []
|
||
self.eliminated: set = set()
|
||
self.current_mode: str = 'proxy' if self.proxy_pool else 'local'
|
||
self.current_index: int = 0
|
||
self.proxy_failures_current: int = 0
|
||
self.local_failures: int = 0
|
||
self.last_local_use_time: float = 0.0
|
||
self.local_disabled_until: float = 0.0
|
||
|
||
def current_route(self) -> Tuple[str, Optional[Dict[str, str]]]:
|
||
"""返回当前路由模式和代理配置"""
|
||
if self.current_mode == 'proxy' and self.proxy_pool:
|
||
return 'proxy', self.proxy_pool[self.current_index]
|
||
return 'local', None
|
||
|
||
def mark_success(self) -> None:
|
||
"""请求成功后重置失败计数"""
|
||
if self.current_mode == 'proxy':
|
||
self.proxy_failures_current = 0
|
||
else:
|
||
self.local_failures = 0
|
||
|
||
def mark_failure(self, reason: str = "") -> None:
|
||
"""请求失败后更新失败计数与淘汰状态"""
|
||
if self.current_mode == 'proxy':
|
||
self.proxy_failures_current += 1
|
||
if self.proxy_failures_current >= self.cfg.proxy_failure_threshold:
|
||
self.eliminated.add(self.current_index)
|
||
else:
|
||
self.local_failures += 1
|
||
|
||
def select_next_route(self) -> Tuple[str, Optional[Dict[str, str]]]:
|
||
"""选择下一个路由(代理或本机),避免无限本机循环"""
|
||
now = time.monotonic()
|
||
if self.current_mode == 'proxy':
|
||
if self.proxy_failures_current >= self.cfg.proxy_failure_threshold:
|
||
if self._local_available(now):
|
||
self.current_mode = 'local'
|
||
self.last_local_use_time = now
|
||
self.proxy_failures_current = 0
|
||
return 'local', None
|
||
next_idx = self._next_proxy_index()
|
||
if next_idx is not None:
|
||
self.current_index = next_idx
|
||
self.proxy_failures_current = 0
|
||
return 'proxy', self.proxy_pool[self.current_index]
|
||
self.current_mode = 'local'
|
||
self.last_local_use_time = now
|
||
self.proxy_failures_current = 0
|
||
return 'local', None
|
||
if self.proxy_pool:
|
||
return 'proxy', self.proxy_pool[self.current_index]
|
||
self.current_mode = 'local'
|
||
return 'local', None
|
||
else:
|
||
if self.local_failures >= self.cfg.local_failure_threshold:
|
||
next_idx = self._next_proxy_index()
|
||
if next_idx is not None:
|
||
self.current_mode = 'proxy'
|
||
self.current_index = next_idx
|
||
self.local_failures = 0
|
||
return 'proxy', self.proxy_pool[self.current_index]
|
||
return 'local', None
|
||
|
||
def _next_proxy_index(self) -> Optional[int]:
|
||
"""查找下一个未被淘汰的代理索引"""
|
||
if not self.proxy_pool:
|
||
return None
|
||
n = len(self.proxy_pool)
|
||
for step in range(1, n + 1):
|
||
cand = (self.current_index + step) % n
|
||
if cand not in self.eliminated:
|
||
return cand
|
||
return None
|
||
|
||
def _local_available(self, now: float) -> bool:
|
||
"""本机是否可用(冷却与禁用窗口判断)"""
|
||
if now < self.local_disabled_until:
|
||
return False
|
||
return (now - self.last_local_use_time) >= self.cfg.local_cooldown_sec
|
||
|
||
def disable_local_temporarily(self, seconds: int) -> None:
|
||
"""临时禁用本机IP"""
|
||
self.local_disabled_until = time.monotonic() + max(0, seconds)
|
||
|
||
def manual_switch_to_proxy(self, index: int) -> None:
|
||
"""人工指定代理索引"""
|
||
if 0 <= index < len(self.proxy_pool) and index not in self.eliminated:
|
||
self.current_mode = 'proxy'
|
||
self.current_index = index
|
||
self.proxy_failures_current = 0
|
||
|
||
def enable_local(self) -> None:
|
||
"""重新允许本机IP"""
|
||
self.local_disabled_until = 0.0
|
||
|
||
def generate_boss_trace_id() -> str:
|
||
"""生成Boss直聘的trace_id
|
||
基于Boss直聘官方算法:
|
||
1. 获取当前时间戳的16进制表示,取后6位
|
||
2. 生成10位随机字符串(包含数字、小写字母、大写字母)
|
||
3. 拼接为 F-{timestamp_hex}{random_string} 格式
|
||
"""
|
||
# 获取当前时间戳的16进制表示,取后6位
|
||
timestamp_hex = hex(int(time.time() * 1000))[2:][-6:]
|
||
|
||
# 字符集:数字 + 小写字母 + 大写字母
|
||
charset = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||
|
||
# 生成10位随机字符串
|
||
random_string = ''.join(random.choice(charset) for _ in range(10))
|
||
|
||
# 拼接最终的traceid
|
||
trace_id = f"F-{timestamp_hex}{random_string}"
|
||
|
||
return trace_id
|
||
|
||
def generate_token() -> str:
|
||
chars = "0123456789abcdef"
|
||
return ''.join(random.choice(chars) for _ in range(32))
|