feat: 爬虫优化 — company_desc 补全、Boss详情获取、URL修复

- 新增 company_enrichment.py: job 入库时自动补全 company_desc
  (优先查 MySQL,fallback 调平台 API 获取并入库)
- Boss 爬虫: 搜索列表后逐条调 batch 详情接口拿完整数据
  (jobBaseInfoVO/brandComInfoVO),每条获取后立即上报
- Boss push_mapper: 兼容新旧两种 API 格式(扁平/嵌套VO)
- Boss token: 启动时自动从后端 API 读取数据库中的 mpt/wt2
- Boss client: header 值 strip 防止空格导致请求失败
- qcwy URL: 用 jobId/coId 拼接 jobs.51job.com 格式
- 三个平台 max_pages 默认改为 100
This commit is contained in:
win 2026-03-22 21:54:19 +08:00
parent a177516c23
commit 24918a272b
9 changed files with 687 additions and 31 deletions

View File

@ -0,0 +1,176 @@
"""
Job 入库时自动补全 company_desc
每个平台独立处理
- 从原始 job 数据提取 source_company_id各平台 key 不同
- 批量查 MySQL {Platform}Company
- 找到 直接填入 company_desc
- 没找到 调平台 API 获取公司详情 写入 MySQL 回填 company_desc
"""
from __future__ import annotations
import asyncio
from typing import Any, Callable, Dict, List, Optional
from app.log import logger
from app.services.company_storage import company_storage, normalize_company_id
# ─────────────────────────────────────────────
# 各平台:从 raw job data 提取 source_company_id
# ─────────────────────────────────────────────
def _zhilian_company_id(raw: Dict[str, Any]) -> Optional[str]:
"""智联用 companyNumber (如 CZL1227180200)"""
return raw.get("companyNumber") or raw.get("rootCompanyNumber")
def _boss_company_id(raw: Dict[str, Any]) -> Optional[str]:
"""Boss 用 encryptBrandId"""
brand = raw.get("brandComInfoVO") or {}
val = brand.get("encryptBrandId")
if val:
return str(val)
bid = raw.get("brandId")
return str(bid) if bid else None
def _qcwy_company_id(raw: Dict[str, Any]) -> Optional[str]:
"""前程无忧用 coId"""
val = raw.get("coId")
return str(val) if val else None
_EXTRACTORS: Dict[str, Callable[[Dict[str, Any]], Optional[str]]] = {
"zhilian": _zhilian_company_id,
"boss": _boss_company_id,
"qcwy": _qcwy_company_id,
}
# 单批次最大 API 调用数,防止拖慢入库
_MAX_API_CALLS_PER_BATCH = 5
# ─────────────────────────────────────────────
# 主入口
# ─────────────────────────────────────────────
async def enrich_company_desc(
platform: str,
push_data_list: List[Dict[str, Any]],
) -> int:
"""对 push_data_list 中 company_desc 为空的记录补全公司简介
直接修改 push_data_list 中的 dict
Returns: 补全成功的数量
"""
extractor = _EXTRACTORS.get(platform)
if not extractor:
return 0
# 1. 找出需要补全的记录
need_enrich: List[tuple[int, str]] = [] # (index_in_list, normalized_company_id)
for i, item in enumerate(push_data_list):
if item.get("company_desc"):
continue
raw = item.get("base_data") or {}
cid = extractor(raw)
if not cid:
continue
normalized = normalize_company_id(platform, cid)
if normalized:
need_enrich.append((i, normalized))
if not need_enrich:
return 0
# 2. 去重,批量查 MySQL
unique_ids = list({cid for _, cid in need_enrich})
desc_map = await _batch_lookup_mysql(platform, unique_ids)
# 3. MySQL 中没有的公司 → 调 API 获取(限制数量)
missing_ids = [cid for cid in unique_ids if cid not in desc_map]
if missing_ids:
api_ids = missing_ids[:_MAX_API_CALLS_PER_BATCH]
if len(missing_ids) > _MAX_API_CALLS_PER_BATCH:
logger.info(
f"[enrichment] {platform} 待获取公司 {len(missing_ids)} 个,"
f"本批次限制 {_MAX_API_CALLS_PER_BATCH}"
)
fetched = await _fetch_and_store_companies(platform, api_ids)
desc_map.update(fetched)
# 4. 回填 company_desc
enriched = 0
for idx, cid in need_enrich:
desc = desc_map.get(cid)
if desc:
push_data_list[idx]["company_desc"] = desc
enriched += 1
if enriched:
logger.info(f"[enrichment] {platform} 补全 company_desc: {enriched}/{len(need_enrich)}")
return enriched
# ─────────────────────────────────────────────
# MySQL 批量查询
# ─────────────────────────────────────────────
async def _batch_lookup_mysql(
platform: str,
company_ids: List[str],
) -> Dict[str, str]:
"""批量查 MySQL {Platform}Company 表,返回 {source_company_id: description}"""
if not company_ids:
return {}
model = company_storage.company_model(platform)
rows = await model.filter(
source_company_id__in=company_ids,
).values_list("source_company_id", "description")
return {
str(sid): str(desc)
for sid, desc in rows
if desc
}
# ─────────────────────────────────────────────
# API fallback获取公司详情并写入 MySQL
# ─────────────────────────────────────────────
async def _fetch_and_store_companies(
platform: str,
company_ids: List[str],
) -> Dict[str, str]:
"""逐个调平台 API 获取公司详情,写入 MySQL返回 {company_id: description}"""
# 延迟导入避免循环依赖
from app.services.company_cleaner import company_cleaner
result: Dict[str, str] = {}
for cid in company_ids:
try:
# boss 需要先加载 token
if platform == "boss":
await company_cleaner._ensure_boss_token_loaded()
data = await company_cleaner._fetch_company_data(platform, cid)
if not data:
continue
upsert_result = await company_storage.upsert_company(
platform, data, company_id=cid,
)
record = upsert_result.get("record")
if record and record.description:
result[cid] = record.description
logger.info(f"[enrichment] {platform} 公司 {cid} 从 API 获取成功")
except Exception as e:
logger.warning(f"[enrichment] {platform} 公司 {cid} API 获取失败: {e}")
return result

View File

@ -5,45 +5,83 @@ from app.services.ingest.remote_push import safe_get, safe_join
def _extract_job_id(data: Dict[str, Any]) -> Optional[str]:
job_base = data.get("jobBaseInfoVO", {})
val = job_base.get("jobId") if job_base else None
# 新格式: encryptJobId 在顶层
# 旧格式: jobBaseInfoVO.jobId
val = data.get("encryptJobId")
if not val:
job_base = data.get("jobBaseInfoVO") or {}
val = job_base.get("jobId") or job_base.get("encryptJobId")
return str(val) if val else None
def _extract_company_name(data: Dict[str, Any]) -> Optional[str]:
name = data.get("name") or (data.get("companyFullInfoVO") or {}).get("name")
name = data.get("brandName") or data.get("name") or (data.get("companyFullInfoVO") or {}).get("name")
return str(name) if name else None
def _build_boss_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
# 兼容新旧两种 API 格式
# 旧格式: brandComInfoVO / jobBaseInfoVO / bossBaseInfoVO 嵌套
# 新格式: 扁平结构,字段直接在顶层
boss_base = data.get("bossBaseInfoVO") or {}
job_base = data.get("jobBaseInfoVO") or {}
brand = data.get("brandComInfoVO") or {}
# 职位信息优先新格式顶层字段fallback 旧格式嵌套字段
title = data.get("jobName") or safe_get(job_base, "positionName")
description = data.get("jobDesc") or safe_get(job_base, "jobDesc")
education = data.get("jobDegree") or safe_get(job_base, "degreeName")
years = data.get("jobExperience") or safe_get(job_base, "experienceName")
skills = data.get("skills") or job_base.get("requiredSkills") or []
welfares = data.get("welfares") or job_base.get("salaryWelfareInfo") or []
encrypt_job_id = data.get("encryptJobId") or safe_get(job_base, "encryptJobId")
# 薪资:新格式用 salaryDesc旧格式用 lowSalary-highSalary
salary = data.get("salaryDesc") or ""
if not salary:
low = safe_get(job_base, "lowSalary")
high = safe_get(job_base, "highSalary")
salary = f"{low}-{high}" if low or high else ""
# 位置:新格式用 cityName/districtName/businessName旧格式用 locationName/locationDesc
city = data.get("cityName") or ""
district = data.get("districtName") or ""
business = data.get("businessName") or ""
location = f"{city}{district}{business}".strip() or safe_get(job_base, "locationName", "位置信息未找到")
position = location or safe_get(job_base, "locationDesc", "位置信息未找到")
# 公司信息优先新格式顶层字段fallback 旧格式嵌套字段
brand_name = data.get("brandName") or safe_get(brand, "brandName")
brand_scale = data.get("brandScale") or safe_get(brand, "scaleName")
brand_stage = data.get("brandStage") or safe_get(brand, "stageName")
encrypt_brand_id = safe_get(brand, "encryptBrandId")
brand_industry = safe_get(brand, "industryName")
return {
"source_type": "Boss直聘",
"name": safe_get(brand, "brandName"),
"common_name": safe_get(boss_base, "brandName"),
"title": safe_get(job_base, "positionName"),
"title_addr": safe_get(job_base, "positionName"),
"description": safe_get(job_base, "jobDesc"),
"education": safe_get(job_base, "degreeName"),
"skill": safe_join(job_base.get("requiredSkills")),
"welfare": safe_join(job_base.get("salaryWelfareInfo")),
"years": safe_get(job_base, "experienceName"),
"salary": f'{safe_get(job_base, "lowSalary")}-{safe_get(job_base, "highSalary")}',
"location": safe_get(job_base, "locationName", "位置信息未找到"),
"position": safe_get(job_base, "locationDesc", "位置信息未找到"),
"name": brand_name,
"common_name": brand_name or safe_get(boss_base, "brandName"),
"title": title,
"title_addr": title,
"description": description,
"education": education,
"skill": safe_join(skills),
"welfare": safe_join(welfares),
"years": years,
"salary": salary,
"location": location,
"position": position,
"job_type": "全职",
"size": safe_get(brand, "scaleName"),
"size": brand_scale,
"employer_type": "全职",
"industry": safe_get(brand, "industryName"),
"industry": brand_industry,
"job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "",
"date": "", "start_date": "", "end_date": "",
"age": "", "sex": "", "number": "",
"url": f"https://www.zhipin.com/job_detail/{safe_get(job_base, 'encryptJobId')}.html",
"company_id": safe_get(brand, "encryptBrandId"),
"company_name": safe_get(brand, "brandName"),
"company_url": f"https://www.zhipin.com/gongsi/{safe_get(brand, 'encryptBrandId')}.html",
"url": f"https://www.zhipin.com/job_detail/{encrypt_job_id}.html" if encrypt_job_id else "",
"company_id": encrypt_brand_id,
"company_name": brand_name,
"company_url": f"https://www.zhipin.com/gongsi/{encrypt_brand_id}.html" if encrypt_brand_id else "",
"company_desc": safe_get(brand, "introduce"),
"base_data": data,
}

View File

@ -45,6 +45,11 @@ def _build_qcwy_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
raw_area = f"{city_str}{landmark_str}".strip()
area_val = raw_area or "位置信息未找到"
job_id = data.get("jobId") or ""
co_id = data.get("coId") or ""
job_url = data.get("jobHref") or (f"https://jobs.51job.com/all/{job_id}.html" if job_id else "")
company_url = data.get("companyHref") or (f"https://jobs.51job.com/all/co{co_id}.html" if co_id else "")
return {
"source_type": "前程无忧",
"name": data.get("companyName"),
@ -67,10 +72,10 @@ def _build_qcwy_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"employer_type": data.get("companyTypeString"),
"industry": f'{data.get("major1Str", "")}-{data.get("major2Str", "")}',
"job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "",
"url": data.get("jobHref"),
"company_id": data.get("coId"),
"url": job_url,
"company_id": co_id,
"company_name": data.get("fullCompanyName"),
"company_url": data.get("companyHref"),
"company_url": company_url,
"company_desc": data.get("company_desc", ""),
"base_data": data,
}

View File

@ -0,0 +1,160 @@
import json
from typing import Dict, Any, List, Optional
from clickhouse_connect.driver import AsyncClient
from app.log import logger
from app.services.ingest.registry import get_config, list_configs, PlatformConfig
from app.services.ingest.company_enrichment import enrich_company_desc
from app.services.ingest.dedup import build_insert_row, batch_dedup_filter
from app.services.ingest.remote_push import batch_push_to_remote
class IngestService:
"""统一数据入库服务"""
def __init__(self, clickhouse_client: AsyncClient):
self.client = clickhouse_client
async def store_batch(
self,
platform: str,
channel: str,
data_type: str,
data_list: List[Dict[str, Any]],
check_duplicate: bool = True,
) -> Dict[str, Any]:
results: Dict[str, Any] = {
"total": len(data_list), "success": 0, "failed": 0, "duplicate": 0, "errors": [],
}
if not data_list:
return results
config = get_config(platform, channel, data_type)
table = f"job_data.{config.table}"
# 准备所有行
all_columns: Optional[List[str]] = None
all_values: List[List[Any]] = []
push_data_list: List[Dict[str, Any]] = []
for i, data in enumerate(data_list):
try:
columns, values = build_insert_row(config, data, channel)
all_columns = all_columns or columns
all_values.append(values)
if config.push_mapper and data_type == "job":
push_item = config.push_mapper(data)
if push_item:
push_data_list.append(push_item)
except Exception as e:
results["failed"] += 1
results["errors"].append({"index": i, "error": str(e)})
if not all_values or all_columns is None:
return results
# 批量去重
if check_duplicate:
try:
filtered, ignored = await batch_dedup_filter(
self.client, config, data_list, all_columns, all_values,
)
all_values = filtered
results["duplicate"] = ignored
except Exception as e:
logger.error(f"批量去重失败: {e}")
# 批量插入
if all_values:
try:
await self.client.insert(table, all_values, column_names=all_columns)
results["success"] = len(all_values)
except Exception as e:
logger.error(f"批量插入失败: {e}")
results["failed"] += len(all_values)
results["errors"].append({"error": f"批量插入失败: {e}"})
# 异步远程推送(不影响主结果)
if push_data_list:
# 补全 company_desc优先 MySQLfallback 平台 API
try:
await enrich_company_desc(platform, push_data_list)
except Exception as e:
logger.warning(f"company_desc 补全异常: {e}")
try:
await batch_push_to_remote(push_data_list)
except Exception as e:
logger.warning(f"批量推送失败: {e}")
return results
async def store_single(
self,
platform: str,
channel: str,
data_type: str,
data: Dict[str, Any],
check_duplicate: bool = True,
) -> Dict[str, Any]:
return await self.store_batch(platform, channel, data_type, [data], check_duplicate)
async def query_data(
self,
platform: str,
channel: str,
data_type: str,
limit: int = 100,
offset: int = 0,
) -> Dict[str, Any]:
config = get_config(platform, channel, data_type)
table = f"job_data.{config.table}"
try:
count_result = await self.client.query(f"SELECT count() FROM {table}")
total = count_result.result_rows[0][0] if count_result.result_rows else 0
query = f"SELECT * FROM {table} ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}"
result = await self.client.query(query)
data_rows = []
for row in result.result_rows:
item = dict(zip(result.column_names, row))
if "json_data" in item and isinstance(item["json_data"], str):
try:
parsed = json.loads(item["json_data"])
if isinstance(parsed, dict):
item.update(parsed)
except Exception:
pass
data_rows.append(item)
return {"success": True, "data": data_rows, "count": total, "table_name": config.table}
except Exception as e:
logger.error(f"查询失败: {e}")
return {"success": False, "message": str(e), "error": str(e)}
@staticmethod
def get_registry_info() -> Dict[str, Any]:
configs = list_configs()
platforms = sorted({c.platform for c in configs})
channels = sorted({c.channel for c in configs})
data_types = sorted({c.data_type for c in configs})
entries = [
{
"platform": c.platform,
"channel": c.channel,
"data_type": c.data_type,
"table": c.table,
"dedup_columns": c.dedup_columns,
}
for c in configs
]
return {
"platforms": platforms,
"channels": channels,
"data_types": data_types,
"entries": entries,
}

View File

@ -74,8 +74,8 @@ class BossClient(HTTPClient):
def _boss_headers(self) -> dict:
"""构造每次请求需要动态更新的 Boss 请求头"""
return {
"mpt": self.signer.mpt,
"wt2": self.signer.wt2,
"mpt": (self.signer.mpt or "").strip(),
"wt2": (self.signer.wt2 or "").strip(),
"Traceid": BossSign.generate_traceid("M-W"),
}

View File

@ -33,7 +33,7 @@ if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from crawler_core.base import BaseFetcher, BaseSearcher
from spiderJobs.platforms.boss.api import GetBrandDetail, SearchRecJobs
from spiderJobs.platforms.boss.api import GetBrandDetail, GetJobDetail, SearchRecJobs
from spiderJobs.platforms.boss.client import BossClient, create_client
from crawler_core.boss.sign import BossSign
from spiderJobs.runner.loop import run_crawl_loop
@ -91,10 +91,51 @@ def create_company_fetcher(company_id: str, http_client: BossClient) -> BaseFetc
return GetBrandDetail(brand_id=company_id, client=http_client)
def enrich_boss_job(job: dict, http_client: BossClient) -> Optional[dict]:
"""通过 batch 详情接口获取完整 job + 公司信息
列表接口只返回基本字段详情接口返回完整的
jobBaseInfoVO / brandComInfoVO / bossBaseInfoVO 等嵌套数据
"""
security_id = job.get("securityId", "")
encrypt_job_id = job.get("encryptJobId", "")
lid = job.get("lid", "")
if not security_id or not encrypt_job_id:
return None
result = GetJobDetail(
security_id=security_id,
job_id=encrypt_job_id,
lid=lid,
client=http_client,
).fetch()
if not result.success or not result.data:
print(f" [详情] {encrypt_job_id} 获取失败: {result.error}")
return None
detail = result.data.get("detail") or {}
print(f" [详情] {encrypt_job_id} 获取成功")
return detail
def main():
# 优先环境变量,没有则从后端 API 读取数据库中的 token
mpt = os.environ.get("BOSS_MPT", "")
wt2 = os.environ.get("BOSS_WT2", "")
if not mpt:
from spiderJobs.runner.api_client import RunnerAPIClient
api = RunnerAPIClient(platform="boss")
token_data = api.fetch_token()
if token_data:
mpt = (token_data.get("mpt") or "").strip()
wt2 = (token_data.get("wt2") or "").strip()
print(f"[boss] 从后端获取 Token 成功: mpt={mpt[:10]}...")
else:
print("[boss] 警告: 未获取到 Token签名可能失败")
client_kwargs = {}
if mpt or wt2:
signer = BossSign(mpt=mpt, wt2=wt2)
@ -114,11 +155,12 @@ def main():
platform="boss",
create_searcher=create_searcher,
create_client_fn=create_client,
max_pages=3,
max_pages=100,
data_type="job",
client_kwargs=client_kwargs,
extract_company_id=extract_company_id,
create_company_fetcher=create_company_fetcher,
enrich_job=enrich_boss_job,
)

View File

@ -104,7 +104,7 @@ def main():
platform="qcwy",
create_searcher=create_searcher,
create_client_fn=create_client,
max_pages=3,
max_pages=100,
data_type="job",
client_kwargs=client_kwargs,
extract_company_id=extract_company_id,

View File

@ -91,7 +91,6 @@ def create_company_fetcher(company_id: str, http_client: ZhilianClient) -> BaseF
def main():
client_kwargs = {}
proxy = os.environ.get("PROXY_URL", "")
if proxy:
client_kwargs["proxy"] = proxy
@ -100,7 +99,7 @@ def main():
platform="zhilian",
create_searcher=create_searcher,
create_client_fn=create_cgate_client,
max_pages=3,
max_pages=100,
data_type="job",
client_kwargs=client_kwargs,
extract_company_id=extract_company_id,

236
spiderJobs/runner/loop.py Normal file
View File

@ -0,0 +1,236 @@
"""
runner.loop - 通用爬虫主循环
提供 run_crawl_loop() 作为所有平台的统一入口
各平台只需提供 create_searcher(keyword, client) 工厂函数
可选传入 extract_company_id / create_company_fetcher
实现搜索 job 时顺带抓取公司详情维度1
"""
from __future__ import annotations
import os
import random
import time
import traceback
from typing import Any, Callable, Optional
from spiderJobs.core.base import ApiResult, BaseFetcher, BaseSearcher
from spiderJobs.runner.api_client import RunnerAPIClient
def sleep_random(min_s: float = 10, max_s: float = 20) -> None:
"""反爬随机延迟"""
delay = random.uniform(min_s, max_s)
print(f"[延迟] 等待 {delay:.1f}s ...")
time.sleep(delay)
def _crawl_companies_from_jobs(
jobs: list[dict],
*,
extract_company_id: Callable[[dict], Optional[str]],
create_company_fetcher: Callable[[str, Any], BaseFetcher],
http_client: Any,
api: RunnerAPIClient,
seen_companies: set[str],
sleep_min: float,
sleep_max: float,
) -> None:
"""从 job 结果中提取公司 ID 并抓取公司详情(内联公司爬取)"""
new_ids: list[str] = []
for job in jobs:
cid = extract_company_id(job)
if cid and cid not in seen_companies:
seen_companies.add(cid)
new_ids.append(cid)
if not new_ids:
return
print(f" [公司] 发现 {len(new_ids)} 个新公司,开始抓取详情...")
ok = 0
for cid in new_ids:
sleep_random(sleep_min, sleep_max)
try:
fetcher = create_company_fetcher(cid, http_client)
result = fetcher.fetch()
if result.success and result.data:
data = result.data if isinstance(result.data, dict) else {"raw": result.data}
api.upload_data([data], data_type="company")
ok += 1
else:
print(f" [公司] {cid} 获取失败: {result.error}")
except Exception as e:
print(f" [公司] {cid} 异常: {e}")
print(f" [公司] 批次完成: {ok}/{len(new_ids)} 成功")
def run_crawl_loop(
*,
platform: str,
create_searcher: Callable[[dict, Any], BaseSearcher],
create_client_fn: Callable[..., Any],
max_pages: int = 3,
sleep_min: float = 10,
sleep_max: float = 20,
data_type: str = "job",
api_base_url: str = "",
client_kwargs: dict | None = None,
# ── 可选:内联公司爬取 ──
extract_company_id: Callable[[dict], Optional[str]] | None = None,
create_company_fetcher: Callable[[str, Any], BaseFetcher] | None = None,
# ── 可选获取详情enrichment ──
enrich_job: Callable[[dict, Any], Optional[dict]] | None = None,
) -> None:
"""通用爬虫主循环
Args:
platform: 平台标识 (boss/qcwy/zhilian)
create_searcher: 工厂函数 (keyword_dict, http_client) -> BaseSearcher
create_client_fn: 平台 HTTP client 工厂
max_pages: 每个关键词最大翻页数
sleep_min/max: 请求间随机延迟范围
data_type: 数据类型 (job/company)
api_base_url: 后端 API 地址
client_kwargs: 传给 create_client_fn 的额外参数
extract_company_id: 从单条 job dict 提取公司 ID (可选)
create_company_fetcher: 创建公司详情 fetcher (company_id, http_client) -> BaseFetcher (可选)
enrich_job: 从列表 job dict 获取完整详情 (job_dict, http_client) -> 详情 dict (可选)
"""
max_pages = int(os.environ.get("MAX_PAGES", str(max_pages)))
sleep_min = float(os.environ.get("SLEEP_MIN_SECONDS", str(sleep_min)))
sleep_max = float(os.environ.get("SLEEP_MAX_SECONDS", str(sleep_max)))
inline_company = bool(
extract_company_id and create_company_fetcher
and os.environ.get("INLINE_COMPANY", "1") != "0"
)
api = RunnerAPIClient(
base_url=api_base_url,
platform=platform,
)
print(f"[{platform}] 爬虫启动 | crawler_id={api.crawler_id}")
print(f"[{platform}] API: {api.base_url} | max_pages={max_pages} | delay={sleep_min}-{sleep_max}s")
if inline_company:
print(f"[{platform}] 内联公司爬取: 已启用 (INLINE_COMPANY=0 可关闭)")
http_client = create_client_fn(**(client_kwargs or {}))
# 会话级公司 ID 去重集合
seen_companies: set[str] = set()
while True:
try:
# 1. 获取关键词
keywords = api.fetch_keyword(limit=1)
if not keywords:
print(f"[{platform}] 无可用关键词,等待 60s 后重试...")
time.sleep(60)
continue
kw = keywords[0]
kw_id = kw["id"]
city = kw.get("city", "")
job = kw.get("job", "")
start_page = (kw.get("last_completed_page") or 0) + 1
print(f"\n[{platform}] 开始爬取: city={city} job={job} (id={kw_id}, 从第{start_page}页)")
# 2. 创建 searcher
searcher = create_searcher(kw, http_client)
# 3. 带断点续爬的分页抓取
all_jobs: list[dict] = []
last_page = start_page - 1
error_occurred = False
for page_index in range(start_page, start_page + max_pages):
sleep_random(sleep_min, sleep_max)
try:
result = searcher.search(page_index=page_index)
except Exception as e:
print(f"[{platform}] 第{page_index}页请求异常: {e}")
error_occurred = True
api.report_crawl_complete(
kw_id, status="failed", error_message=str(e)
)
break
if not result.success:
print(f"[{platform}] 第{page_index}页失败: {result.error}")
error_occurred = True
api.report_crawl_complete(
kw_id,
status="failed",
error_message=result.error or "unknown",
)
break
page_jobs = result.list
all_jobs.extend(page_jobs)
last_page = page_index
# 汇报进度
api.report_page_progress(
keyword_id=kw_id,
page=page_index,
jobs_found=len(page_jobs),
)
print(
f"[{platform}] 第{page_index}页: {len(page_jobs)}条 | "
f"累计: {len(all_jobs)}条 | is_end={result.is_end_page}"
)
# 上传本页数据(实时推送,不积攒)
if page_jobs:
if enrich_job:
# 逐条获取详情并立即上报,避免批量失败浪费
for j, job in enumerate(page_jobs):
try:
detail = enrich_job(job, http_client)
upload_job = detail if detail else job
except Exception as e:
print(f" [详情] 第{j+1}条获取失败: {e}")
upload_job = job
api.upload_data([upload_job], data_type=data_type)
if j < len(page_jobs) - 1:
sleep_random(sleep_min, sleep_max)
else:
api.upload_data(page_jobs, data_type=data_type)
# 内联公司爬取:从本页 job 中提取公司并抓取详情
if inline_company and page_jobs:
_crawl_companies_from_jobs(
page_jobs,
extract_company_id=extract_company_id,
create_company_fetcher=create_company_fetcher,
http_client=http_client,
api=api,
seen_companies=seen_companies,
sleep_min=sleep_min,
sleep_max=sleep_max,
)
if result.is_end_page:
break
# 4. 完成
if not error_occurred:
api.report_crawl_complete(kw_id, status="completed")
print(f"[{platform}] 关键词 {city}/{job} 完成,共{len(all_jobs)}")
except KeyboardInterrupt:
print(f"\n[{platform}] 收到中断信号,退出...")
break
except Exception as e:
print(f"[{platform}] 主循环异常: {e}")
traceback.print_exc()
time.sleep(30)