JobData/crawler_core/http_client.py
win ceb359d535 feat(01-01): create crawler_core/http_client.py with tenacity retry and stdlib logging
- Port HTTPClient from spiderJobs/core/http_client.py
- Add tenacity @retry decorator on post() and get() (3 attempts, min=10s wait)
- Use stdlib logging.getLogger('crawler_core.http_client') — no loguru
- No imports from spiderJobs.* or app.*
- TLS fingerprint and proxy logic preserved unchanged
2026-03-21 18:08:59 +08:00

192 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
crawler_core.http_client — 通用 HTTP 客户端
基于 requests-go自带 Chrome TLS 指纹伪装TLS_CHROME_LATEST + random_ja3=True
支持代理 IP / 隧道代理 / 代理池轮换。
内置 tenacity 重试3次指数退避最小10秒间隔
使用 stdlib logging — 上层可通过 logging.getLogger('crawler_core') 配置。
不依赖 loguru / FastAPI / Tortoise-ORM 等应用框架。
"""
from __future__ import annotations
import logging
import random
from typing import Any, Optional
import requests_go as requests
from requests_go.tls_config import TLS_CHROME_LATEST
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)
logger = logging.getLogger("crawler_core.http_client")
class HTTPClient:
"""
通用 HTTP 客户端
Args:
base_url: API 基础地址
default_headers: 默认请求头
proxy: 固定代理地址(绑定到 session复用连接
tunnel_proxy: 隧道代理地址(每次请求新建 session确保 IP 轮换)
proxy_pool: 代理池列表(每次请求随机选一个)
timeout: 请求超时秒数(默认 10
代理优先级: tunnel_proxy > proxy_pool > proxy
三者只用其一即可。
代理格式示例:
普通代理: "http://127.0.0.1:7890"
SOCKS5 代理: "socks5://127.0.0.1:1080"
隧道代理: "http://user:pass@tunnel.example.com:12345"
隧道代理(认证): "http://account-zone-xxx:password@proxy.host:port"
隧道代理用法(每次请求自动换 IP:
client = HTTPClient(
base_url="https://example.com",
tunnel_proxy="http://user:pass@tunnel.example.com:12345",
)
# 每次 get/post 都会新建 TCP 连接,隧道代理自动分配新 IP
"""
def __init__(
self,
base_url: str,
default_headers: Optional[dict] = None,
proxy: Optional[str] = None,
tunnel_proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
timeout: int = 10,
):
self.base_url = base_url
self.default_headers = default_headers or {}
self.timeout = timeout
# 代理配置
self._proxy = proxy
self._tunnel_proxy = tunnel_proxy
self._proxy_pool = proxy_pool
# 创建 session + TLS 指纹
self._session = requests.Session()
self._session.tls_config = TLS_CHROME_LATEST
TLS_CHROME_LATEST.random_ja3 = True
# 固定代理直接设到 session 上
if proxy and not proxy_pool and not tunnel_proxy:
self._session.proxies = {"http": proxy, "https": proxy}
def _new_session(self) -> requests.Session:
"""创建全新 session用于隧道代理 IP 轮换)"""
s = requests.Session()
s.tls_config = TLS_CHROME_LATEST
TLS_CHROME_LATEST.random_ja3 = True
return s
def _get_proxies(self) -> Optional[dict]:
"""获取本次请求的代理配置"""
if self._proxy_pool:
# 代理池:随机选一个,加 #random_hash 打破连接复用
chosen = random.choice(self._proxy_pool)
unique = f"{chosen}#{random.randint(100000, 999999)}"
return {"http": unique, "https": unique}
return None # 固定代理已在 session 上,不需要每次传
def _merge_headers(self, extra: Optional[dict] = None) -> dict:
headers = {**self.default_headers}
if extra:
headers.update(extra)
return headers
@retry(
stop=stop_after_attempt(3),
wait=wait_random_exponential(multiplier=1, min=10, max=30),
retry=retry_if_exception_type((ConnectionError, TimeoutError, OSError)),
reraise=True,
before_sleep=lambda retry_state: logger.warning(
"HTTP retry attempt=%d url=%s error=%s",
retry_state.attempt_number,
retry_state.args[1] if retry_state.args else "unknown",
retry_state.outcome.exception(),
),
)
def post(self, path: str, body: dict, headers: Optional[dict] = None) -> tuple[int, Any]:
"""发送 POST 请求"""
logger.debug("POST %s%s", self.base_url, path)
merged_headers = self._merge_headers(headers)
# 隧道代理:每次新 session确保 IP 轮换
if self._tunnel_proxy:
s = self._new_session()
try:
resp = s.post(
f"{self.base_url}{path}",
json=body,
headers=merged_headers,
proxies={"http": self._tunnel_proxy, "https": self._tunnel_proxy},
timeout=self.timeout,
)
return resp.status_code, resp.json()
finally:
s.close()
kwargs: dict[str, Any] = {
"json": body,
"headers": merged_headers,
"timeout": self.timeout,
}
proxies = self._get_proxies()
if proxies:
kwargs["proxies"] = proxies
resp = self._session.post(f"{self.base_url}{path}", **kwargs)
return resp.status_code, resp.json()
@retry(
stop=stop_after_attempt(3),
wait=wait_random_exponential(multiplier=1, min=10, max=30),
retry=retry_if_exception_type((ConnectionError, TimeoutError, OSError)),
reraise=True,
before_sleep=lambda retry_state: logger.warning(
"HTTP retry attempt=%d error=%s",
retry_state.attempt_number,
retry_state.outcome.exception(),
),
)
def get(self, path: str, params: Optional[dict] = None, headers: Optional[dict] = None) -> tuple[int, Any]:
"""发送 GET 请求"""
logger.debug("GET %s%s", self.base_url, path)
merged_headers = self._merge_headers(headers)
# 隧道代理:每次新 session确保 IP 轮换
if self._tunnel_proxy:
s = self._new_session()
try:
resp = s.get(
f"{self.base_url}{path}",
params=params,
headers=merged_headers,
proxies={"http": self._tunnel_proxy, "https": self._tunnel_proxy},
timeout=self.timeout,
)
return resp.status_code, resp.json()
finally:
s.close()
kwargs: dict[str, Any] = {
"params": params,
"headers": merged_headers,
"timeout": self.timeout,
}
proxies = self._get_proxies()
if proxies:
kwargs["proxies"] = proxies
resp = self._session.get(f"{self.base_url}{path}", **kwargs)
return resp.status_code, resp.json()