""" runner.company_loop - 独立公司爬虫主循环 从后端 pending_company 队列获取待爬取公司, 逐个调用平台 API 获取公司详情并上传。 """ from __future__ import annotations import os import time import traceback from typing import Any, Callable from spiderJobs.core.base import BaseFetcher from spiderJobs.runner.api_client import RunnerAPIClient from spiderJobs.runner.loop import sleep_random def run_company_loop( *, platform: str, create_company_fetcher: Callable[[str, Any], BaseFetcher], create_client_fn: Callable[..., Any], batch_size: int = 10, sleep_min: float = 10, sleep_max: float = 20, api_base_url: str = "", client_kwargs: dict | None = None, ) -> None: """独立公司爬虫主循环 Args: platform: 平台标识 (boss/qcwy/zhilian) create_company_fetcher: 工厂函数 (company_id, http_client) -> BaseFetcher create_client_fn: 平台 HTTP client 工厂 batch_size: 每批获取待处理公司数量 sleep_min/max: 请求间随机延迟范围(秒) api_base_url: 后端 API 地址 client_kwargs: 传给 create_client_fn 的额外参数 """ batch_size = int(os.environ.get("COMPANY_BATCH_SIZE", str(batch_size))) sleep_min = float(os.environ.get("SLEEP_MIN_SECONDS", str(sleep_min))) sleep_max = float(os.environ.get("SLEEP_MAX_SECONDS", str(sleep_max))) api = RunnerAPIClient( base_url=api_base_url, platform=platform, ) print(f"[{platform}-company] 公司爬虫启动 | crawler_id={api.crawler_id}") print(f"[{platform}-company] API: {api.base_url} | batch={batch_size} | delay={sleep_min}-{sleep_max}s") http_client = create_client_fn(**(client_kwargs or {})) while True: try: # 1. 获取待爬取公司列表 companies = api.fetch_pending_companies(limit=batch_size, status="pending") if not companies: print(f"[{platform}-company] 无待处理公司,等待 120s ...") time.sleep(120) continue print(f"\n[{platform}-company] 获取到 {len(companies)} 个待处理公司") # 2. 逐个爬取 success_count = 0 fail_count = 0 for company in companies: company_id = company.get("company_id", "") company_name = company.get("company_name", "") if not company_id: continue sleep_random(sleep_min, sleep_max) try: fetcher = create_company_fetcher(company_id, http_client) result = fetcher.fetch() if result.success and result.data: # 上传公司数据 data_to_upload = result.data if isinstance(result.data, dict) else {"raw": result.data} api.upload_data([data_to_upload], data_type="company") # 标记完成 api.update_company_status(company_id, status="done") success_count += 1 print(f" [OK] {company_name or company_id}") else: api.update_company_status( company_id, status="failed", error_message=result.error or "empty data", ) fail_count += 1 print(f" [FAIL] {company_name or company_id}: {result.error}") except Exception as e: api.update_company_status( company_id, status="failed", error_message=str(e)[:500], ) fail_count += 1 print(f" [ERROR] {company_name or company_id}: {e}") print( f"[{platform}-company] 批次完成: 成功={success_count} 失败={fail_count}" ) except KeyboardInterrupt: print(f"\n[{platform}-company] 收到中断信号,退出...") break except Exception as e: print(f"[{platform}-company] 主循环异常: {e}") traceback.print_exc() time.sleep(30)