From 24918a272b91f9e81406afa383a84da6408f6c77 Mon Sep 17 00:00:00 2001 From: win Date: Sun, 22 Mar 2026 21:54:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=88=AC=E8=99=AB=E4=BC=98=E5=8C=96=20?= =?UTF-8?q?=E2=80=94=20company=5Fdesc=20=E8=A1=A5=E5=85=A8=E3=80=81Boss?= =?UTF-8?q?=E8=AF=A6=E6=83=85=E8=8E=B7=E5=8F=96=E3=80=81URL=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 company_enrichment.py: job 入库时自动补全 company_desc (优先查 MySQL,fallback 调平台 API 获取并入库) - Boss 爬虫: 搜索列表后逐条调 batch 详情接口拿完整数据 (jobBaseInfoVO/brandComInfoVO),每条获取后立即上报 - Boss push_mapper: 兼容新旧两种 API 格式(扁平/嵌套VO) - Boss token: 启动时自动从后端 API 读取数据库中的 mpt/wt2 - Boss client: header 值 strip 防止空格导致请求失败 - qcwy URL: 用 jobId/coId 拼接 jobs.51job.com 格式 - 三个平台 max_pages 默认改为 100 --- app/services/ingest/company_enrichment.py | 176 ++++++++++++++++ app/services/ingest/configs/boss.py | 80 ++++++-- app/services/ingest/configs/qcwy.py | 11 +- app/services/ingest/service.py | 160 +++++++++++++++ spiderJobs/platforms/boss/client.py | 4 +- spiderJobs/platforms/boss/main.py | 46 ++++- spiderJobs/platforms/job51/main.py | 2 +- spiderJobs/platforms/zhilian/main.py | 3 +- spiderJobs/runner/loop.py | 236 ++++++++++++++++++++++ 9 files changed, 687 insertions(+), 31 deletions(-) create mode 100644 app/services/ingest/company_enrichment.py create mode 100644 app/services/ingest/service.py create mode 100644 spiderJobs/runner/loop.py diff --git a/app/services/ingest/company_enrichment.py b/app/services/ingest/company_enrichment.py new file mode 100644 index 0000000..304e661 --- /dev/null +++ b/app/services/ingest/company_enrichment.py @@ -0,0 +1,176 @@ +""" +Job 入库时自动补全 company_desc + +每个平台独立处理: + - 从原始 job 数据提取 source_company_id(各平台 key 不同) + - 批量查 MySQL {Platform}Company 表 + - 找到 → 直接填入 company_desc + - 没找到 → 调平台 API 获取公司详情 → 写入 MySQL → 回填 company_desc +""" + +from __future__ import annotations + +import asyncio +from typing import Any, Callable, Dict, List, Optional + +from app.log import logger +from app.services.company_storage import company_storage, normalize_company_id + + +# ───────────────────────────────────────────── +# 各平台:从 raw job data 提取 source_company_id +# ───────────────────────────────────────────── + +def _zhilian_company_id(raw: Dict[str, Any]) -> Optional[str]: + """智联用 companyNumber (如 CZL1227180200)""" + return raw.get("companyNumber") or raw.get("rootCompanyNumber") + + +def _boss_company_id(raw: Dict[str, Any]) -> Optional[str]: + """Boss 用 encryptBrandId""" + brand = raw.get("brandComInfoVO") or {} + val = brand.get("encryptBrandId") + if val: + return str(val) + bid = raw.get("brandId") + return str(bid) if bid else None + + +def _qcwy_company_id(raw: Dict[str, Any]) -> Optional[str]: + """前程无忧用 coId""" + val = raw.get("coId") + return str(val) if val else None + + +_EXTRACTORS: Dict[str, Callable[[Dict[str, Any]], Optional[str]]] = { + "zhilian": _zhilian_company_id, + "boss": _boss_company_id, + "qcwy": _qcwy_company_id, +} + +# 单批次最大 API 调用数,防止拖慢入库 +_MAX_API_CALLS_PER_BATCH = 5 + + +# ───────────────────────────────────────────── +# 主入口 +# ───────────────────────────────────────────── + +async def enrich_company_desc( + platform: str, + push_data_list: List[Dict[str, Any]], +) -> int: + """对 push_data_list 中 company_desc 为空的记录补全公司简介 + + 直接修改 push_data_list 中的 dict。 + Returns: 补全成功的数量 + """ + extractor = _EXTRACTORS.get(platform) + if not extractor: + return 0 + + # 1. 找出需要补全的记录 + need_enrich: List[tuple[int, str]] = [] # (index_in_list, normalized_company_id) + for i, item in enumerate(push_data_list): + if item.get("company_desc"): + continue + raw = item.get("base_data") or {} + cid = extractor(raw) + if not cid: + continue + normalized = normalize_company_id(platform, cid) + if normalized: + need_enrich.append((i, normalized)) + + if not need_enrich: + return 0 + + # 2. 去重,批量查 MySQL + unique_ids = list({cid for _, cid in need_enrich}) + desc_map = await _batch_lookup_mysql(platform, unique_ids) + + # 3. MySQL 中没有的公司 → 调 API 获取(限制数量) + missing_ids = [cid for cid in unique_ids if cid not in desc_map] + if missing_ids: + api_ids = missing_ids[:_MAX_API_CALLS_PER_BATCH] + if len(missing_ids) > _MAX_API_CALLS_PER_BATCH: + logger.info( + f"[enrichment] {platform} 待获取公司 {len(missing_ids)} 个," + f"本批次限制 {_MAX_API_CALLS_PER_BATCH} 个" + ) + fetched = await _fetch_and_store_companies(platform, api_ids) + desc_map.update(fetched) + + # 4. 回填 company_desc + enriched = 0 + for idx, cid in need_enrich: + desc = desc_map.get(cid) + if desc: + push_data_list[idx]["company_desc"] = desc + enriched += 1 + + if enriched: + logger.info(f"[enrichment] {platform} 补全 company_desc: {enriched}/{len(need_enrich)}") + + return enriched + + +# ───────────────────────────────────────────── +# MySQL 批量查询 +# ───────────────────────────────────────────── + +async def _batch_lookup_mysql( + platform: str, + company_ids: List[str], +) -> Dict[str, str]: + """批量查 MySQL {Platform}Company 表,返回 {source_company_id: description}""" + if not company_ids: + return {} + + model = company_storage.company_model(platform) + rows = await model.filter( + source_company_id__in=company_ids, + ).values_list("source_company_id", "description") + + return { + str(sid): str(desc) + for sid, desc in rows + if desc + } + + +# ───────────────────────────────────────────── +# API fallback:获取公司详情并写入 MySQL +# ───────────────────────────────────────────── + +async def _fetch_and_store_companies( + platform: str, + company_ids: List[str], +) -> Dict[str, str]: + """逐个调平台 API 获取公司详情,写入 MySQL,返回 {company_id: description}""" + # 延迟导入避免循环依赖 + from app.services.company_cleaner import company_cleaner + + result: Dict[str, str] = {} + + for cid in company_ids: + try: + # boss 需要先加载 token + if platform == "boss": + await company_cleaner._ensure_boss_token_loaded() + + data = await company_cleaner._fetch_company_data(platform, cid) + if not data: + continue + + upsert_result = await company_storage.upsert_company( + platform, data, company_id=cid, + ) + record = upsert_result.get("record") + if record and record.description: + result[cid] = record.description + logger.info(f"[enrichment] {platform} 公司 {cid} 从 API 获取成功") + except Exception as e: + logger.warning(f"[enrichment] {platform} 公司 {cid} API 获取失败: {e}") + + return result diff --git a/app/services/ingest/configs/boss.py b/app/services/ingest/configs/boss.py index d70bce8..0896b1c 100644 --- a/app/services/ingest/configs/boss.py +++ b/app/services/ingest/configs/boss.py @@ -5,45 +5,83 @@ from app.services.ingest.remote_push import safe_get, safe_join def _extract_job_id(data: Dict[str, Any]) -> Optional[str]: - job_base = data.get("jobBaseInfoVO", {}) - val = job_base.get("jobId") if job_base else None + # 新格式: encryptJobId 在顶层 + # 旧格式: jobBaseInfoVO.jobId + val = data.get("encryptJobId") + if not val: + job_base = data.get("jobBaseInfoVO") or {} + val = job_base.get("jobId") or job_base.get("encryptJobId") return str(val) if val else None def _extract_company_name(data: Dict[str, Any]) -> Optional[str]: - name = data.get("name") or (data.get("companyFullInfoVO") or {}).get("name") + name = data.get("brandName") or data.get("name") or (data.get("companyFullInfoVO") or {}).get("name") return str(name) if name else None def _build_boss_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + # 兼容新旧两种 API 格式 + # 旧格式: brandComInfoVO / jobBaseInfoVO / bossBaseInfoVO 嵌套 + # 新格式: 扁平结构,字段直接在顶层 boss_base = data.get("bossBaseInfoVO") or {} job_base = data.get("jobBaseInfoVO") or {} brand = data.get("brandComInfoVO") or {} + + # 职位信息:优先新格式顶层字段,fallback 旧格式嵌套字段 + title = data.get("jobName") or safe_get(job_base, "positionName") + description = data.get("jobDesc") or safe_get(job_base, "jobDesc") + education = data.get("jobDegree") or safe_get(job_base, "degreeName") + years = data.get("jobExperience") or safe_get(job_base, "experienceName") + skills = data.get("skills") or job_base.get("requiredSkills") or [] + welfares = data.get("welfares") or job_base.get("salaryWelfareInfo") or [] + encrypt_job_id = data.get("encryptJobId") or safe_get(job_base, "encryptJobId") + + # 薪资:新格式用 salaryDesc,旧格式用 lowSalary-highSalary + salary = data.get("salaryDesc") or "" + if not salary: + low = safe_get(job_base, "lowSalary") + high = safe_get(job_base, "highSalary") + salary = f"{low}-{high}" if low or high else "" + + # 位置:新格式用 cityName/districtName/businessName,旧格式用 locationName/locationDesc + city = data.get("cityName") or "" + district = data.get("districtName") or "" + business = data.get("businessName") or "" + location = f"{city}{district}{business}".strip() or safe_get(job_base, "locationName", "位置信息未找到") + position = location or safe_get(job_base, "locationDesc", "位置信息未找到") + + # 公司信息:优先新格式顶层字段,fallback 旧格式嵌套字段 + brand_name = data.get("brandName") or safe_get(brand, "brandName") + brand_scale = data.get("brandScale") or safe_get(brand, "scaleName") + brand_stage = data.get("brandStage") or safe_get(brand, "stageName") + encrypt_brand_id = safe_get(brand, "encryptBrandId") + brand_industry = safe_get(brand, "industryName") + return { "source_type": "Boss直聘", - "name": safe_get(brand, "brandName"), - "common_name": safe_get(boss_base, "brandName"), - "title": safe_get(job_base, "positionName"), - "title_addr": safe_get(job_base, "positionName"), - "description": safe_get(job_base, "jobDesc"), - "education": safe_get(job_base, "degreeName"), - "skill": safe_join(job_base.get("requiredSkills")), - "welfare": safe_join(job_base.get("salaryWelfareInfo")), - "years": safe_get(job_base, "experienceName"), - "salary": f'{safe_get(job_base, "lowSalary")}-{safe_get(job_base, "highSalary")}', - "location": safe_get(job_base, "locationName", "位置信息未找到"), - "position": safe_get(job_base, "locationDesc", "位置信息未找到"), + "name": brand_name, + "common_name": brand_name or safe_get(boss_base, "brandName"), + "title": title, + "title_addr": title, + "description": description, + "education": education, + "skill": safe_join(skills), + "welfare": safe_join(welfares), + "years": years, + "salary": salary, + "location": location, + "position": position, "job_type": "全职", - "size": safe_get(brand, "scaleName"), + "size": brand_scale, "employer_type": "全职", - "industry": safe_get(brand, "industryName"), + "industry": brand_industry, "job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "", "date": "", "start_date": "", "end_date": "", "age": "", "sex": "", "number": "", - "url": f"https://www.zhipin.com/job_detail/{safe_get(job_base, 'encryptJobId')}.html", - "company_id": safe_get(brand, "encryptBrandId"), - "company_name": safe_get(brand, "brandName"), - "company_url": f"https://www.zhipin.com/gongsi/{safe_get(brand, 'encryptBrandId')}.html", + "url": f"https://www.zhipin.com/job_detail/{encrypt_job_id}.html" if encrypt_job_id else "", + "company_id": encrypt_brand_id, + "company_name": brand_name, + "company_url": f"https://www.zhipin.com/gongsi/{encrypt_brand_id}.html" if encrypt_brand_id else "", "company_desc": safe_get(brand, "introduce"), "base_data": data, } diff --git a/app/services/ingest/configs/qcwy.py b/app/services/ingest/configs/qcwy.py index 1ea3a3e..891e04c 100644 --- a/app/services/ingest/configs/qcwy.py +++ b/app/services/ingest/configs/qcwy.py @@ -45,6 +45,11 @@ def _build_qcwy_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: raw_area = f"{city_str}{landmark_str}".strip() area_val = raw_area or "位置信息未找到" + job_id = data.get("jobId") or "" + co_id = data.get("coId") or "" + job_url = data.get("jobHref") or (f"https://jobs.51job.com/all/{job_id}.html" if job_id else "") + company_url = data.get("companyHref") or (f"https://jobs.51job.com/all/co{co_id}.html" if co_id else "") + return { "source_type": "前程无忧", "name": data.get("companyName"), @@ -67,10 +72,10 @@ def _build_qcwy_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: "employer_type": data.get("companyTypeString"), "industry": f'{data.get("major1Str", "")}-{data.get("major2Str", "")}', "job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "", - "url": data.get("jobHref"), - "company_id": data.get("coId"), + "url": job_url, + "company_id": co_id, "company_name": data.get("fullCompanyName"), - "company_url": data.get("companyHref"), + "company_url": company_url, "company_desc": data.get("company_desc", ""), "base_data": data, } diff --git a/app/services/ingest/service.py b/app/services/ingest/service.py new file mode 100644 index 0000000..3066920 --- /dev/null +++ b/app/services/ingest/service.py @@ -0,0 +1,160 @@ +import json +from typing import Dict, Any, List, Optional + +from clickhouse_connect.driver import AsyncClient + +from app.log import logger +from app.services.ingest.registry import get_config, list_configs, PlatformConfig +from app.services.ingest.company_enrichment import enrich_company_desc +from app.services.ingest.dedup import build_insert_row, batch_dedup_filter +from app.services.ingest.remote_push import batch_push_to_remote + + +class IngestService: + """统一数据入库服务""" + + def __init__(self, clickhouse_client: AsyncClient): + self.client = clickhouse_client + + async def store_batch( + self, + platform: str, + channel: str, + data_type: str, + data_list: List[Dict[str, Any]], + check_duplicate: bool = True, + ) -> Dict[str, Any]: + results: Dict[str, Any] = { + "total": len(data_list), "success": 0, "failed": 0, "duplicate": 0, "errors": [], + } + if not data_list: + return results + + config = get_config(platform, channel, data_type) + table = f"job_data.{config.table}" + + # 准备所有行 + all_columns: Optional[List[str]] = None + all_values: List[List[Any]] = [] + push_data_list: List[Dict[str, Any]] = [] + + for i, data in enumerate(data_list): + try: + columns, values = build_insert_row(config, data, channel) + all_columns = all_columns or columns + all_values.append(values) + + if config.push_mapper and data_type == "job": + push_item = config.push_mapper(data) + if push_item: + push_data_list.append(push_item) + except Exception as e: + results["failed"] += 1 + results["errors"].append({"index": i, "error": str(e)}) + + if not all_values or all_columns is None: + return results + + # 批量去重 + if check_duplicate: + try: + filtered, ignored = await batch_dedup_filter( + self.client, config, data_list, all_columns, all_values, + ) + all_values = filtered + results["duplicate"] = ignored + except Exception as e: + logger.error(f"批量去重失败: {e}") + + # 批量插入 + if all_values: + try: + await self.client.insert(table, all_values, column_names=all_columns) + results["success"] = len(all_values) + except Exception as e: + logger.error(f"批量插入失败: {e}") + results["failed"] += len(all_values) + results["errors"].append({"error": f"批量插入失败: {e}"}) + + # 异步远程推送(不影响主结果) + if push_data_list: + # 补全 company_desc(优先 MySQL,fallback 平台 API) + try: + await enrich_company_desc(platform, push_data_list) + except Exception as e: + logger.warning(f"company_desc 补全异常: {e}") + + try: + await batch_push_to_remote(push_data_list) + except Exception as e: + logger.warning(f"批量推送失败: {e}") + + return results + + async def store_single( + self, + platform: str, + channel: str, + data_type: str, + data: Dict[str, Any], + check_duplicate: bool = True, + ) -> Dict[str, Any]: + return await self.store_batch(platform, channel, data_type, [data], check_duplicate) + + async def query_data( + self, + platform: str, + channel: str, + data_type: str, + limit: int = 100, + offset: int = 0, + ) -> Dict[str, Any]: + config = get_config(platform, channel, data_type) + table = f"job_data.{config.table}" + + try: + count_result = await self.client.query(f"SELECT count() FROM {table}") + total = count_result.result_rows[0][0] if count_result.result_rows else 0 + + query = f"SELECT * FROM {table} ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}" + result = await self.client.query(query) + + data_rows = [] + for row in result.result_rows: + item = dict(zip(result.column_names, row)) + if "json_data" in item and isinstance(item["json_data"], str): + try: + parsed = json.loads(item["json_data"]) + if isinstance(parsed, dict): + item.update(parsed) + except Exception: + pass + data_rows.append(item) + + return {"success": True, "data": data_rows, "count": total, "table_name": config.table} + except Exception as e: + logger.error(f"查询失败: {e}") + return {"success": False, "message": str(e), "error": str(e)} + + @staticmethod + def get_registry_info() -> Dict[str, Any]: + configs = list_configs() + platforms = sorted({c.platform for c in configs}) + channels = sorted({c.channel for c in configs}) + data_types = sorted({c.data_type for c in configs}) + entries = [ + { + "platform": c.platform, + "channel": c.channel, + "data_type": c.data_type, + "table": c.table, + "dedup_columns": c.dedup_columns, + } + for c in configs + ] + return { + "platforms": platforms, + "channels": channels, + "data_types": data_types, + "entries": entries, + } diff --git a/spiderJobs/platforms/boss/client.py b/spiderJobs/platforms/boss/client.py index 8d0c7e1..7b8312b 100644 --- a/spiderJobs/platforms/boss/client.py +++ b/spiderJobs/platforms/boss/client.py @@ -74,8 +74,8 @@ class BossClient(HTTPClient): def _boss_headers(self) -> dict: """构造每次请求需要动态更新的 Boss 请求头""" return { - "mpt": self.signer.mpt, - "wt2": self.signer.wt2, + "mpt": (self.signer.mpt or "").strip(), + "wt2": (self.signer.wt2 or "").strip(), "Traceid": BossSign.generate_traceid("M-W"), } diff --git a/spiderJobs/platforms/boss/main.py b/spiderJobs/platforms/boss/main.py index c78f10c..8666810 100644 --- a/spiderJobs/platforms/boss/main.py +++ b/spiderJobs/platforms/boss/main.py @@ -33,7 +33,7 @@ if _project_root not in sys.path: sys.path.insert(0, _project_root) from crawler_core.base import BaseFetcher, BaseSearcher -from spiderJobs.platforms.boss.api import GetBrandDetail, SearchRecJobs +from spiderJobs.platforms.boss.api import GetBrandDetail, GetJobDetail, SearchRecJobs from spiderJobs.platforms.boss.client import BossClient, create_client from crawler_core.boss.sign import BossSign from spiderJobs.runner.loop import run_crawl_loop @@ -91,10 +91,51 @@ def create_company_fetcher(company_id: str, http_client: BossClient) -> BaseFetc return GetBrandDetail(brand_id=company_id, client=http_client) +def enrich_boss_job(job: dict, http_client: BossClient) -> Optional[dict]: + """通过 batch 详情接口获取完整 job + 公司信息 + + 列表接口只返回基本字段,详情接口返回完整的 + jobBaseInfoVO / brandComInfoVO / bossBaseInfoVO 等嵌套数据 + """ + security_id = job.get("securityId", "") + encrypt_job_id = job.get("encryptJobId", "") + lid = job.get("lid", "") + + if not security_id or not encrypt_job_id: + return None + + result = GetJobDetail( + security_id=security_id, + job_id=encrypt_job_id, + lid=lid, + client=http_client, + ).fetch() + + if not result.success or not result.data: + print(f" [详情] {encrypt_job_id} 获取失败: {result.error}") + return None + + detail = result.data.get("detail") or {} + print(f" [详情] {encrypt_job_id} 获取成功") + return detail + + def main(): + # 优先环境变量,没有则从后端 API 读取数据库中的 token mpt = os.environ.get("BOSS_MPT", "") wt2 = os.environ.get("BOSS_WT2", "") + if not mpt: + from spiderJobs.runner.api_client import RunnerAPIClient + api = RunnerAPIClient(platform="boss") + token_data = api.fetch_token() + if token_data: + mpt = (token_data.get("mpt") or "").strip() + wt2 = (token_data.get("wt2") or "").strip() + print(f"[boss] 从后端获取 Token 成功: mpt={mpt[:10]}...") + else: + print("[boss] 警告: 未获取到 Token,签名可能失败") + client_kwargs = {} if mpt or wt2: signer = BossSign(mpt=mpt, wt2=wt2) @@ -114,11 +155,12 @@ def main(): platform="boss", create_searcher=create_searcher, create_client_fn=create_client, - max_pages=3, + max_pages=100, data_type="job", client_kwargs=client_kwargs, extract_company_id=extract_company_id, create_company_fetcher=create_company_fetcher, + enrich_job=enrich_boss_job, ) diff --git a/spiderJobs/platforms/job51/main.py b/spiderJobs/platforms/job51/main.py index 058d3ec..d036257 100644 --- a/spiderJobs/platforms/job51/main.py +++ b/spiderJobs/platforms/job51/main.py @@ -104,7 +104,7 @@ def main(): platform="qcwy", create_searcher=create_searcher, create_client_fn=create_client, - max_pages=3, + max_pages=100, data_type="job", client_kwargs=client_kwargs, extract_company_id=extract_company_id, diff --git a/spiderJobs/platforms/zhilian/main.py b/spiderJobs/platforms/zhilian/main.py index 5b9b36f..58fb8a6 100644 --- a/spiderJobs/platforms/zhilian/main.py +++ b/spiderJobs/platforms/zhilian/main.py @@ -91,7 +91,6 @@ def create_company_fetcher(company_id: str, http_client: ZhilianClient) -> BaseF def main(): client_kwargs = {} - proxy = os.environ.get("PROXY_URL", "") if proxy: client_kwargs["proxy"] = proxy @@ -100,7 +99,7 @@ def main(): platform="zhilian", create_searcher=create_searcher, create_client_fn=create_cgate_client, - max_pages=3, + max_pages=100, data_type="job", client_kwargs=client_kwargs, extract_company_id=extract_company_id, diff --git a/spiderJobs/runner/loop.py b/spiderJobs/runner/loop.py new file mode 100644 index 0000000..8d8593c --- /dev/null +++ b/spiderJobs/runner/loop.py @@ -0,0 +1,236 @@ +""" +runner.loop - 通用爬虫主循环 + +提供 run_crawl_loop() 作为所有平台的统一入口。 +各平台只需提供 create_searcher(keyword, client) 工厂函数。 + +可选:传入 extract_company_id / create_company_fetcher +实现搜索 job 时顺带抓取公司详情(维度1)。 +""" + +from __future__ import annotations + +import os +import random +import time +import traceback +from typing import Any, Callable, Optional + +from spiderJobs.core.base import ApiResult, BaseFetcher, BaseSearcher +from spiderJobs.runner.api_client import RunnerAPIClient + + +def sleep_random(min_s: float = 10, max_s: float = 20) -> None: + """反爬随机延迟""" + delay = random.uniform(min_s, max_s) + print(f"[延迟] 等待 {delay:.1f}s ...") + time.sleep(delay) + + +def _crawl_companies_from_jobs( + jobs: list[dict], + *, + extract_company_id: Callable[[dict], Optional[str]], + create_company_fetcher: Callable[[str, Any], BaseFetcher], + http_client: Any, + api: RunnerAPIClient, + seen_companies: set[str], + sleep_min: float, + sleep_max: float, +) -> None: + """从 job 结果中提取公司 ID 并抓取公司详情(内联公司爬取)""" + new_ids: list[str] = [] + for job in jobs: + cid = extract_company_id(job) + if cid and cid not in seen_companies: + seen_companies.add(cid) + new_ids.append(cid) + + if not new_ids: + return + + print(f" [公司] 发现 {len(new_ids)} 个新公司,开始抓取详情...") + ok = 0 + for cid in new_ids: + sleep_random(sleep_min, sleep_max) + try: + fetcher = create_company_fetcher(cid, http_client) + result = fetcher.fetch() + if result.success and result.data: + data = result.data if isinstance(result.data, dict) else {"raw": result.data} + api.upload_data([data], data_type="company") + ok += 1 + else: + print(f" [公司] {cid} 获取失败: {result.error}") + except Exception as e: + print(f" [公司] {cid} 异常: {e}") + + print(f" [公司] 批次完成: {ok}/{len(new_ids)} 成功") + + +def run_crawl_loop( + *, + platform: str, + create_searcher: Callable[[dict, Any], BaseSearcher], + create_client_fn: Callable[..., Any], + max_pages: int = 3, + sleep_min: float = 10, + sleep_max: float = 20, + data_type: str = "job", + api_base_url: str = "", + client_kwargs: dict | None = None, + # ── 可选:内联公司爬取 ── + extract_company_id: Callable[[dict], Optional[str]] | None = None, + create_company_fetcher: Callable[[str, Any], BaseFetcher] | None = None, + # ── 可选:获取详情enrichment ── + enrich_job: Callable[[dict, Any], Optional[dict]] | None = None, +) -> None: + """通用爬虫主循环 + + Args: + platform: 平台标识 (boss/qcwy/zhilian) + create_searcher: 工厂函数 (keyword_dict, http_client) -> BaseSearcher + create_client_fn: 平台 HTTP client 工厂 + max_pages: 每个关键词最大翻页数 + sleep_min/max: 请求间随机延迟范围(秒) + data_type: 数据类型 (job/company) + api_base_url: 后端 API 地址 + client_kwargs: 传给 create_client_fn 的额外参数 + extract_company_id: 从单条 job dict 提取公司 ID (可选) + create_company_fetcher: 创建公司详情 fetcher (company_id, http_client) -> BaseFetcher (可选) + enrich_job: 从列表 job dict 获取完整详情 (job_dict, http_client) -> 详情 dict (可选) + """ + max_pages = int(os.environ.get("MAX_PAGES", str(max_pages))) + sleep_min = float(os.environ.get("SLEEP_MIN_SECONDS", str(sleep_min))) + sleep_max = float(os.environ.get("SLEEP_MAX_SECONDS", str(sleep_max))) + inline_company = bool( + extract_company_id and create_company_fetcher + and os.environ.get("INLINE_COMPANY", "1") != "0" + ) + + api = RunnerAPIClient( + base_url=api_base_url, + platform=platform, + ) + + print(f"[{platform}] 爬虫启动 | crawler_id={api.crawler_id}") + print(f"[{platform}] API: {api.base_url} | max_pages={max_pages} | delay={sleep_min}-{sleep_max}s") + if inline_company: + print(f"[{platform}] 内联公司爬取: 已启用 (INLINE_COMPANY=0 可关闭)") + + http_client = create_client_fn(**(client_kwargs or {})) + + # 会话级公司 ID 去重集合 + seen_companies: set[str] = set() + + while True: + try: + # 1. 获取关键词 + keywords = api.fetch_keyword(limit=1) + if not keywords: + print(f"[{platform}] 无可用关键词,等待 60s 后重试...") + time.sleep(60) + continue + + kw = keywords[0] + kw_id = kw["id"] + city = kw.get("city", "") + job = kw.get("job", "") + start_page = (kw.get("last_completed_page") or 0) + 1 + + print(f"\n[{platform}] 开始爬取: city={city} job={job} (id={kw_id}, 从第{start_page}页)") + + # 2. 创建 searcher + searcher = create_searcher(kw, http_client) + + # 3. 带断点续爬的分页抓取 + all_jobs: list[dict] = [] + last_page = start_page - 1 + error_occurred = False + + for page_index in range(start_page, start_page + max_pages): + sleep_random(sleep_min, sleep_max) + + try: + result = searcher.search(page_index=page_index) + except Exception as e: + print(f"[{platform}] 第{page_index}页请求异常: {e}") + error_occurred = True + api.report_crawl_complete( + kw_id, status="failed", error_message=str(e) + ) + break + + if not result.success: + print(f"[{platform}] 第{page_index}页失败: {result.error}") + error_occurred = True + api.report_crawl_complete( + kw_id, + status="failed", + error_message=result.error or "unknown", + ) + break + + page_jobs = result.list + all_jobs.extend(page_jobs) + last_page = page_index + + # 汇报进度 + api.report_page_progress( + keyword_id=kw_id, + page=page_index, + jobs_found=len(page_jobs), + ) + + print( + f"[{platform}] 第{page_index}页: {len(page_jobs)}条 | " + f"累计: {len(all_jobs)}条 | is_end={result.is_end_page}" + ) + + # 上传本页数据(实时推送,不积攒) + if page_jobs: + if enrich_job: + # 逐条获取详情并立即上报,避免批量失败浪费 + for j, job in enumerate(page_jobs): + try: + detail = enrich_job(job, http_client) + upload_job = detail if detail else job + except Exception as e: + print(f" [详情] 第{j+1}条获取失败: {e}") + upload_job = job + + api.upload_data([upload_job], data_type=data_type) + + if j < len(page_jobs) - 1: + sleep_random(sleep_min, sleep_max) + else: + api.upload_data(page_jobs, data_type=data_type) + + # 内联公司爬取:从本页 job 中提取公司并抓取详情 + if inline_company and page_jobs: + _crawl_companies_from_jobs( + page_jobs, + extract_company_id=extract_company_id, + create_company_fetcher=create_company_fetcher, + http_client=http_client, + api=api, + seen_companies=seen_companies, + sleep_min=sleep_min, + sleep_max=sleep_max, + ) + + if result.is_end_page: + break + + # 4. 完成 + if not error_occurred: + api.report_crawl_complete(kw_id, status="completed") + print(f"[{platform}] 关键词 {city}/{job} 完成,共{len(all_jobs)}条") + + except KeyboardInterrupt: + print(f"\n[{platform}] 收到中断信号,退出...") + break + except Exception as e: + print(f"[{platform}] 主循环异常: {e}") + traceback.print_exc() + time.sleep(30)