win 04d6303da2 feat(01-01): create crawler_core/base.py with Result[T] and crawler_core/__init__.py
- Define generic Result[T] dataclass (7 fields: success, status_code, data, list, count, is_end_page, error)
- Port parse_response() from spiderJobs/core/base.py returning Result[Any]
- BaseFetcher: 4 template methods (_build_params, _parse required; _build_headers, _check_blocked optional)
- BaseSearcher: 4 template methods with load_all() paginator using stdlib logging
- crawler_core/__init__.py exports BaseFetcher, BaseSearcher, Result, HTTPClient, parse_response
- No ApiResult, no loguru, no spiderJobs/app imports
2026-03-21 18:10:40 +08:00

208 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
crawler_core.base — 通用基类与数据结构
提供所有招聘平台共用的: Result, BaseFetcher, BaseSearcher, parse_response
不依赖任何平台特定代码。
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Generic, Optional, TypeVar
from crawler_core.http_client import HTTPClient
T = TypeVar("T")
_logger = logging.getLogger("crawler_core.base")
# ─────────────────────────────────────────────
# 通用数据结构
# ─────────────────────────────────────────────
@dataclass
class Result(Generic[T]):
"""Typed result wrapper returned by all BaseFetcher and BaseSearcher methods.
Callers annotate as Result[MyJobModel] etc.
"""
success: bool
status_code: int
data: Optional[T] = None
list: list[T] = field(default_factory=list)
count: int = 0
is_end_page: bool = True
error: Optional[str] = None
# ─────────────────────────────────────────────
# 通用响应解析(可覆写)
# ─────────────────────────────────────────────
def parse_response(http_code: int, raw: Any) -> Result[Any]:
"""
默认响应解析算法
各平台如果格式不同,可在子类中覆写 parse_response 方法
"""
biz_code = raw.get("statusCode") if isinstance(raw, dict) else http_code
if http_code != 200 or biz_code != 200:
return Result(
success=False,
status_code=biz_code or http_code,
error=(
raw.get("statusDescription")
or raw.get("message")
or f"请求失败: {biz_code}"
) if isinstance(raw, dict) else f"请求失败: {http_code}",
)
payload = (raw.get("data") or {}) if isinstance(raw, dict) else {}
if isinstance(payload, dict) and "list" in payload:
return Result(
success=True, status_code=200, data=payload,
list=payload.get("list", []),
count=payload.get("count", 0),
is_end_page=payload.get("isEndPage", True),
)
return Result(success=True, status_code=200, data=payload)
# ─────────────────────────────────────────────
# 基础 FetcherGET 详情类)
# ─────────────────────────────────────────────
class BaseFetcher:
"""Template-method base class for single-item fetchers.
Required overrides: _build_params(), _parse()
Optional overrides: _build_headers(), _check_blocked()
"""
ENDPOINT: str = ""
def __init__(self, http_client: HTTPClient) -> None:
self.http_client = http_client
# --- Required template methods ---
def _build_params(self) -> dict:
"""Build query/body parameters for the request. MUST be overridden."""
raise NotImplementedError(f"{type(self).__name__} must implement _build_params()")
def _parse(self, http_code: int, raw: Any) -> Result:
"""Parse the HTTP response into a Result. MUST be overridden."""
raise NotImplementedError(f"{type(self).__name__} must implement _parse()")
# --- Optional template methods ---
def _build_headers(self) -> dict:
"""Build extra request headers. Override to add platform-specific headers.
Default: returns {} (no extra headers beyond HTTPClient defaults).
"""
return {}
def _check_blocked(self, status_code: int, body: str) -> bool:
"""Detect platform-specific anti-crawl blocks.
Override to inspect response body/status for block signals.
Default: returns False (assume not blocked).
"""
return False
# --- Orchestration ---
def fetch(self) -> Result:
"""Execute the fetch: build params → request → check blocked → parse."""
params = self._build_params()
extra_headers = self._build_headers()
http_code, raw = self.http_client.get(
self.ENDPOINT, params=params, headers=extra_headers or None
)
raw_str = str(raw) if not isinstance(raw, str) else raw
if self._check_blocked(http_code, raw_str):
return Result(success=False, status_code=http_code, error="blocked")
return self._parse(http_code, raw)
# ─────────────────────────────────────────────
# 基础 Searcher搜索 + 分页类)
# ─────────────────────────────────────────────
class BaseSearcher:
"""Template-method base class for paginated list searchers.
Required overrides: _build_params(), _parse()
Optional overrides: _build_headers(), _check_blocked()
"""
ENDPOINT: str = ""
def __init__(self, page_size: int = 15, http_client: Optional[HTTPClient] = None) -> None:
self.page_size = page_size
self.http_client = http_client
# --- Required template methods ---
def _build_params(self, page_index: int) -> dict:
"""Build pagination query params. MUST be overridden."""
raise NotImplementedError(f"{type(self).__name__} must implement _build_params()")
def _parse(self, http_code: int, raw: Any) -> Result:
"""Parse the HTTP response into a Result. MUST be overridden."""
raise NotImplementedError(f"{type(self).__name__} must implement _parse()")
# --- Optional template methods ---
def _build_headers(self) -> dict:
"""Build extra request headers. Override for platform-specific headers.
Default: returns {} (no extra headers beyond HTTPClient defaults).
"""
return {}
def _check_blocked(self, status_code: int, body: str) -> bool:
"""Detect platform-specific anti-crawl blocks.
Override to inspect response body/status for block signals.
Default: returns False (assume not blocked).
"""
return False
# --- Orchestration ---
def _request(self, params: dict) -> tuple[int, Any]:
"""Execute a single HTTP request. Uses _build_headers() for extra headers."""
extra_headers = self._build_headers()
return self.http_client.get(
self.ENDPOINT, params=params, headers=extra_headers or None
)
def search(self, page_index: int = 1) -> Result:
"""Fetch a single page: build params → request → check blocked → parse."""
params = self._build_params(page_index)
http_code, raw = self._request(params)
raw_str = str(raw) if not isinstance(raw, str) else raw
if self._check_blocked(http_code, raw_str):
return Result(success=False, status_code=http_code, error="blocked")
return self._parse(http_code, raw)
def load_all(self, max_pages: int = 10, on_page=None) -> list:
"""Iterate pages until is_end_page=True or max_pages reached."""
all_items: list = []
for page_index in range(1, max_pages + 1):
result = self.search(page_index)
if not result.success:
_logger.warning("%d 页失败: %s", page_index, result.error)
break
all_items.extend(result.list)
if on_page:
on_page(page_index, result)
if result.is_end_page:
break
return all_items