JobData/app/core/algorithms/antispider.py
2026-03-22 23:22:30 +08:00

203 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from typing import Dict, Any, Optional, List, Tuple
import random
class IPStrategyConfig:
def __init__(self,
response_time_threshold_sec: int = 5,
proxy_failure_threshold: int = 3,
local_cooldown_sec: int = 1800,
local_failure_threshold: int = 2):
"""IP策略配置
Args:
response_time_threshold_sec (int): 单次请求耗时阈值秒。
proxy_failure_threshold (int): 同一代理连续失败触发切换阈值。
local_cooldown_sec (int): 本机IP使用冷却时间秒。
local_failure_threshold (int): 本机连续失败阈值,超过后回到代理池。
Returns:
None
"""
self.response_time_threshold_sec = response_time_threshold_sec
self.proxy_failure_threshold = proxy_failure_threshold
self.local_cooldown_sec = local_cooldown_sec
self.local_failure_threshold = local_failure_threshold
def update(self, updates: Dict[str, Any]) -> None:
"""动态更新配置"""
for k, v in updates.items():
if hasattr(self, k):
setattr(self, k, v)
class IPAnomalyDetector:
def __init__(self, cfg: IPStrategyConfig):
"""异常检测器
Args:
cfg (IPStrategyConfig): 策略配置。
Returns:
None
"""
self.cfg = cfg
def detect(self, status_code: Optional[int], elapsed_sec: float, resp_json: Optional[Dict], error_text: str = "") -> Optional[str]:
"""检测是否存在IP异常
Args:
status_code (Optional[int]): 响应HTTP状态码异常时可能为None。
elapsed_sec (float): 响应耗时秒。
resp_json (Optional[Dict]): 响应体JSON。
error_text (str): 异常文本。
Returns:
Optional[str]: 异常原因标识字符串无异常返回None。
"""
if status_code in (403, 429, 407):
return f"http_{status_code}"
if elapsed_sec > self.cfg.response_time_threshold_sec:
return "slow_response"
if resp_json:
msg = str(resp_json.get("message", ""))
code = resp_json.get("code")
if code == 35 or ("IP地址存在异常" in msg or ("IP" in msg and "异常" in msg)):
return "ip_banned"
if error_text and ("IP" in error_text and "异常" in error_text):
return "ip_banned"
return None
class SmartIPManager:
def __init__(self, proxy_pool: Optional[List[Dict[str, str]]], cfg: IPStrategyConfig):
"""智能IP管理器
Args:
proxy_pool (Optional[List[Dict[str,str]]]): 代理池列表元素为requests兼容代理字典。
cfg (IPStrategyConfig): 策略配置。
Returns:
None
"""
self.cfg = cfg
self.proxy_pool: List[Dict[str, str]] = proxy_pool or []
self.eliminated: set = set()
self.current_mode: str = 'proxy' if self.proxy_pool else 'local'
self.current_index: int = 0
self.proxy_failures_current: int = 0
self.local_failures: int = 0
self.last_local_use_time: float = 0.0
self.local_disabled_until: float = 0.0
def current_route(self) -> Tuple[str, Optional[Dict[str, str]]]:
"""返回当前路由模式和代理配置"""
if self.current_mode == 'proxy' and self.proxy_pool:
return 'proxy', self.proxy_pool[self.current_index]
return 'local', None
def mark_success(self) -> None:
"""请求成功后重置失败计数"""
if self.current_mode == 'proxy':
self.proxy_failures_current = 0
else:
self.local_failures = 0
def mark_failure(self, reason: str = "") -> None:
"""请求失败后更新失败计数与淘汰状态"""
if self.current_mode == 'proxy':
self.proxy_failures_current += 1
if self.proxy_failures_current >= self.cfg.proxy_failure_threshold:
self.eliminated.add(self.current_index)
else:
self.local_failures += 1
def select_next_route(self) -> Tuple[str, Optional[Dict[str, str]]]:
"""选择下一个路由(代理或本机),避免无限本机循环"""
now = time.monotonic()
if self.current_mode == 'proxy':
if self.proxy_failures_current >= self.cfg.proxy_failure_threshold:
if self._local_available(now):
self.current_mode = 'local'
self.last_local_use_time = now
self.proxy_failures_current = 0
return 'local', None
next_idx = self._next_proxy_index()
if next_idx is not None:
self.current_index = next_idx
self.proxy_failures_current = 0
return 'proxy', self.proxy_pool[self.current_index]
self.current_mode = 'local'
self.last_local_use_time = now
self.proxy_failures_current = 0
return 'local', None
if self.proxy_pool:
return 'proxy', self.proxy_pool[self.current_index]
self.current_mode = 'local'
return 'local', None
else:
if self.local_failures >= self.cfg.local_failure_threshold:
next_idx = self._next_proxy_index()
if next_idx is not None:
self.current_mode = 'proxy'
self.current_index = next_idx
self.local_failures = 0
return 'proxy', self.proxy_pool[self.current_index]
return 'local', None
def _next_proxy_index(self) -> Optional[int]:
"""查找下一个未被淘汰的代理索引"""
if not self.proxy_pool:
return None
n = len(self.proxy_pool)
for step in range(1, n + 1):
cand = (self.current_index + step) % n
if cand not in self.eliminated:
return cand
return None
def _local_available(self, now: float) -> bool:
"""本机是否可用(冷却与禁用窗口判断)"""
if now < self.local_disabled_until:
return False
return (now - self.last_local_use_time) >= self.cfg.local_cooldown_sec
def disable_local_temporarily(self, seconds: int) -> None:
"""临时禁用本机IP"""
self.local_disabled_until = time.monotonic() + max(0, seconds)
def manual_switch_to_proxy(self, index: int) -> None:
"""人工指定代理索引"""
if 0 <= index < len(self.proxy_pool) and index not in self.eliminated:
self.current_mode = 'proxy'
self.current_index = index
self.proxy_failures_current = 0
def enable_local(self) -> None:
"""重新允许本机IP"""
self.local_disabled_until = 0.0
def generate_boss_trace_id() -> str:
"""生成Boss直聘的trace_id
基于Boss直聘官方算法:
1. 获取当前时间戳的16进制表示取后6位
2. 生成10位随机字符串包含数字、小写字母、大写字母
3. 拼接为 F-{timestamp_hex}{random_string} 格式
"""
# 获取当前时间戳的16进制表示取后6位
timestamp_hex = hex(int(time.time() * 1000))[2:][-6:]
# 字符集:数字 + 小写字母 + 大写字母
charset = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
# 生成10位随机字符串
random_string = ''.join(random.choice(charset) for _ in range(10))
# 拼接最终的traceid
trace_id = f"F-{timestamp_hex}{random_string}"
return trace_id
def generate_token() -> str:
chars = "0123456789abcdef"
return ''.join(random.choice(chars) for _ in range(32))