win 24918a272b feat: 爬虫优化 — company_desc 补全、Boss详情获取、URL修复
- 新增 company_enrichment.py: job 入库时自动补全 company_desc
  (优先查 MySQL,fallback 调平台 API 获取并入库)
- Boss 爬虫: 搜索列表后逐条调 batch 详情接口拿完整数据
  (jobBaseInfoVO/brandComInfoVO),每条获取后立即上报
- Boss push_mapper: 兼容新旧两种 API 格式(扁平/嵌套VO)
- Boss token: 启动时自动从后端 API 读取数据库中的 mpt/wt2
- Boss client: header 值 strip 防止空格导致请求失败
- qcwy URL: 用 jobId/coId 拼接 jobs.51job.com 格式
- 三个平台 max_pages 默认改为 100
2026-03-22 21:54:19 +08:00

237 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
runner.loop - 通用爬虫主循环
提供 run_crawl_loop() 作为所有平台的统一入口。
各平台只需提供 create_searcher(keyword, client) 工厂函数。
可选:传入 extract_company_id / create_company_fetcher
实现搜索 job 时顺带抓取公司详情维度1
"""
from __future__ import annotations
import os
import random
import time
import traceback
from typing import Any, Callable, Optional
from spiderJobs.core.base import ApiResult, BaseFetcher, BaseSearcher
from spiderJobs.runner.api_client import RunnerAPIClient
def sleep_random(min_s: float = 10, max_s: float = 20) -> None:
"""反爬随机延迟"""
delay = random.uniform(min_s, max_s)
print(f"[延迟] 等待 {delay:.1f}s ...")
time.sleep(delay)
def _crawl_companies_from_jobs(
jobs: list[dict],
*,
extract_company_id: Callable[[dict], Optional[str]],
create_company_fetcher: Callable[[str, Any], BaseFetcher],
http_client: Any,
api: RunnerAPIClient,
seen_companies: set[str],
sleep_min: float,
sleep_max: float,
) -> None:
"""从 job 结果中提取公司 ID 并抓取公司详情(内联公司爬取)"""
new_ids: list[str] = []
for job in jobs:
cid = extract_company_id(job)
if cid and cid not in seen_companies:
seen_companies.add(cid)
new_ids.append(cid)
if not new_ids:
return
print(f" [公司] 发现 {len(new_ids)} 个新公司,开始抓取详情...")
ok = 0
for cid in new_ids:
sleep_random(sleep_min, sleep_max)
try:
fetcher = create_company_fetcher(cid, http_client)
result = fetcher.fetch()
if result.success and result.data:
data = result.data if isinstance(result.data, dict) else {"raw": result.data}
api.upload_data([data], data_type="company")
ok += 1
else:
print(f" [公司] {cid} 获取失败: {result.error}")
except Exception as e:
print(f" [公司] {cid} 异常: {e}")
print(f" [公司] 批次完成: {ok}/{len(new_ids)} 成功")
def run_crawl_loop(
*,
platform: str,
create_searcher: Callable[[dict, Any], BaseSearcher],
create_client_fn: Callable[..., Any],
max_pages: int = 3,
sleep_min: float = 10,
sleep_max: float = 20,
data_type: str = "job",
api_base_url: str = "",
client_kwargs: dict | None = None,
# ── 可选:内联公司爬取 ──
extract_company_id: Callable[[dict], Optional[str]] | None = None,
create_company_fetcher: Callable[[str, Any], BaseFetcher] | None = None,
# ── 可选获取详情enrichment ──
enrich_job: Callable[[dict, Any], Optional[dict]] | None = None,
) -> None:
"""通用爬虫主循环
Args:
platform: 平台标识 (boss/qcwy/zhilian)
create_searcher: 工厂函数 (keyword_dict, http_client) -> BaseSearcher
create_client_fn: 平台 HTTP client 工厂
max_pages: 每个关键词最大翻页数
sleep_min/max: 请求间随机延迟范围(秒)
data_type: 数据类型 (job/company)
api_base_url: 后端 API 地址
client_kwargs: 传给 create_client_fn 的额外参数
extract_company_id: 从单条 job dict 提取公司 ID (可选)
create_company_fetcher: 创建公司详情 fetcher (company_id, http_client) -> BaseFetcher (可选)
enrich_job: 从列表 job dict 获取完整详情 (job_dict, http_client) -> 详情 dict (可选)
"""
max_pages = int(os.environ.get("MAX_PAGES", str(max_pages)))
sleep_min = float(os.environ.get("SLEEP_MIN_SECONDS", str(sleep_min)))
sleep_max = float(os.environ.get("SLEEP_MAX_SECONDS", str(sleep_max)))
inline_company = bool(
extract_company_id and create_company_fetcher
and os.environ.get("INLINE_COMPANY", "1") != "0"
)
api = RunnerAPIClient(
base_url=api_base_url,
platform=platform,
)
print(f"[{platform}] 爬虫启动 | crawler_id={api.crawler_id}")
print(f"[{platform}] API: {api.base_url} | max_pages={max_pages} | delay={sleep_min}-{sleep_max}s")
if inline_company:
print(f"[{platform}] 内联公司爬取: 已启用 (INLINE_COMPANY=0 可关闭)")
http_client = create_client_fn(**(client_kwargs or {}))
# 会话级公司 ID 去重集合
seen_companies: set[str] = set()
while True:
try:
# 1. 获取关键词
keywords = api.fetch_keyword(limit=1)
if not keywords:
print(f"[{platform}] 无可用关键词,等待 60s 后重试...")
time.sleep(60)
continue
kw = keywords[0]
kw_id = kw["id"]
city = kw.get("city", "")
job = kw.get("job", "")
start_page = (kw.get("last_completed_page") or 0) + 1
print(f"\n[{platform}] 开始爬取: city={city} job={job} (id={kw_id}, 从第{start_page}页)")
# 2. 创建 searcher
searcher = create_searcher(kw, http_client)
# 3. 带断点续爬的分页抓取
all_jobs: list[dict] = []
last_page = start_page - 1
error_occurred = False
for page_index in range(start_page, start_page + max_pages):
sleep_random(sleep_min, sleep_max)
try:
result = searcher.search(page_index=page_index)
except Exception as e:
print(f"[{platform}] 第{page_index}页请求异常: {e}")
error_occurred = True
api.report_crawl_complete(
kw_id, status="failed", error_message=str(e)
)
break
if not result.success:
print(f"[{platform}] 第{page_index}页失败: {result.error}")
error_occurred = True
api.report_crawl_complete(
kw_id,
status="failed",
error_message=result.error or "unknown",
)
break
page_jobs = result.list
all_jobs.extend(page_jobs)
last_page = page_index
# 汇报进度
api.report_page_progress(
keyword_id=kw_id,
page=page_index,
jobs_found=len(page_jobs),
)
print(
f"[{platform}] 第{page_index}页: {len(page_jobs)}条 | "
f"累计: {len(all_jobs)}条 | is_end={result.is_end_page}"
)
# 上传本页数据(实时推送,不积攒)
if page_jobs:
if enrich_job:
# 逐条获取详情并立即上报,避免批量失败浪费
for j, job in enumerate(page_jobs):
try:
detail = enrich_job(job, http_client)
upload_job = detail if detail else job
except Exception as e:
print(f" [详情] 第{j+1}条获取失败: {e}")
upload_job = job
api.upload_data([upload_job], data_type=data_type)
if j < len(page_jobs) - 1:
sleep_random(sleep_min, sleep_max)
else:
api.upload_data(page_jobs, data_type=data_type)
# 内联公司爬取:从本页 job 中提取公司并抓取详情
if inline_company and page_jobs:
_crawl_companies_from_jobs(
page_jobs,
extract_company_id=extract_company_id,
create_company_fetcher=create_company_fetcher,
http_client=http_client,
api=api,
seen_companies=seen_companies,
sleep_min=sleep_min,
sleep_max=sleep_max,
)
if result.is_end_page:
break
# 4. 完成
if not error_occurred:
api.report_crawl_complete(kw_id, status="completed")
print(f"[{platform}] 关键词 {city}/{job} 完成,共{len(all_jobs)}")
except KeyboardInterrupt:
print(f"\n[{platform}] 收到中断信号,退出...")
break
except Exception as e:
print(f"[{platform}] 主循环异常: {e}")
traceback.print_exc()
time.sleep(30)