""" runner.api_client - 爬虫与后端 API 的通信层 提供关键词获取、进度汇报、数据上传等功能。 爬虫主循环通过此模块与后端交互,实现状态管理。 """ from __future__ import annotations import json import os import time import uuid from typing import Any, Optional import requests class RunnerAPIClient: """后端 API 客户端,负责关键词调度与数据上传""" def __init__( self, base_url: str = "", api_token: str = "dev", platform: str = "", crawler_id: str = "", ): self.base_url = ( base_url or os.environ.get("API_BASE_URL", "http://127.0.0.1:9999") ).rstrip("/") self.api_token = api_token or os.environ.get("API_TOKEN", "dev") self.platform = platform self.crawler_id = crawler_id or f"{platform}-{uuid.uuid4().hex[:8]}" self._session = requests.Session() self._session.headers.update({"token": self.api_token}) # ───────────────────────────────────────────── # 关键词调度 # ───────────────────────────────────────────── def fetch_keyword(self, limit: int = 1) -> list[dict]: """从后端获取可用关键词(自动原子锁定为 crawling 状态) 返回关键词列表,每个元素包含: id, city, job, last_completed_page, crawl_status """ resp = self._get( "/api/v1/keyword/available", params={ "source": self.platform, "limit": limit, "reserve": "true", "crawler_id": self.crawler_id, }, ) print(resp) if resp and resp.get("code") == 200: return resp.get("data", {}).get("items", []) return [] def report_page_progress( self, keyword_id: int, page: int, total_pages: int = 0, jobs_found: int = 0, ) -> dict: """汇报单页爬取进度""" return self._post( "/api/v1/keyword/page-progress", body={ "source": self.platform, "keyword_id": keyword_id, "page": page, "total_pages": total_pages, "jobs_found": jobs_found, }, ) def report_crawl_complete( self, keyword_id: int, status: str = "completed", error_message: str = "", ) -> dict: """汇报爬取完成或失败""" return self._post( "/api/v1/keyword/crawl-complete", body={ "source": self.platform, "keyword_id": keyword_id, "status": status, "error_message": error_message, }, ) # ───────────────────────────────────────────── # 数据上传 # ───────────────────────────────────────────── def upload_data( self, data_list: list[dict], data_type: str = "job", channel: str = "mini", ) -> dict: """批量上传数据到后端(异步入库)""" if not data_list: return {"code": 200, "message": "空数据跳过"} print( f"[上报] {self.platform}/{data_type} | " f"条数={len(data_list)} | channel={channel} | " f"目标={self.base_url}/api/v1/universal/data/batch-store-async" ) resp = self._post( "/api/v1/universal/data/batch-store-async", body={ "data_list": data_list, "data_type": data_type, "platform": self.platform, "channel": channel, }, ) code = resp.get("code", "?") msg = resp.get("msg") or resp.get("message", "") stored = resp.get("data", {}).get("stored", "") if isinstance(resp.get("data"), dict) else "" print(f"[上报] 响应: code={code} msg={msg} {f'stored={stored}' if stored else ''}") return resp # ───────────────────────────────────────────── # Token 管理(Boss 平台需要) # ───────────────────────────────────────────── def fetch_token(self) -> Optional[dict]: """获取可用的平台 Token""" resp = self._get( "/api/v1/token/tokens", params={"platform": self.platform}, ) if resp and resp.get("code") == 200: tokens = resp.get("data", []) return tokens[0] if tokens else None return None # ───────────────────────────────────────────── # 公司队列 # ───────────────────────────────────────────── def fetch_pending_companies( self, limit: int = 10, status: str = "pending", ) -> list[dict]: """从后端获取待爬取公司列表 返回列表,每个元素包含: source, company_id, company_name, status, error_msg """ resp = self._get( "/api/v1/cleaning/companies", params={ "source": self.platform, "status": status, "page_size": limit, }, ) if resp and resp.get("code") == 200: return resp.get("data", []) return [] def update_company_status( self, company_id: str, status: str = "done", error_message: str = "", ) -> dict: """更新公司爬取状态(done/failed)""" return self._post( "/api/v1/cleaning/update-company-status", body={ "source": self.platform, "company_id": company_id, "status": status, "error_message": error_message, }, ) # ───────────────────────────────────────────── # HTTP 底层 # ───────────────────────────────────────────── def _get(self, path: str, params: dict | None = None) -> dict: url = f"{self.base_url}{path}" for attempt in range(3): try: resp = self._session.get(url, params=params, timeout=15) return resp.json() except Exception as e: print(f"[API] GET {path} 第{attempt + 1}次失败: {e}") time.sleep(2 * (attempt + 1)) return {} def _post(self, path: str, body: dict) -> dict: url = f"{self.base_url}{path}" for attempt in range(3): try: resp = self._session.post(url, json=body, timeout=30) return resp.json() except Exception as e: print(f"[API] POST {path} 第{attempt + 1}次失败: {e}") time.sleep(2 * (attempt + 1)) return {}