win 24918a272b feat: 爬虫优化 — company_desc 补全、Boss详情获取、URL修复
- 新增 company_enrichment.py: job 入库时自动补全 company_desc
  (优先查 MySQL,fallback 调平台 API 获取并入库)
- Boss 爬虫: 搜索列表后逐条调 batch 详情接口拿完整数据
  (jobBaseInfoVO/brandComInfoVO),每条获取后立即上报
- Boss push_mapper: 兼容新旧两种 API 格式(扁平/嵌套VO)
- Boss token: 启动时自动从后端 API 读取数据库中的 mpt/wt2
- Boss client: header 值 strip 防止空格导致请求失败
- qcwy URL: 用 jobId/coId 拼接 jobs.51job.com 格式
- 三个平台 max_pages 默认改为 100
2026-03-22 21:54:19 +08:00

161 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from typing import Dict, Any, List, Optional
from clickhouse_connect.driver import AsyncClient
from app.log import logger
from app.services.ingest.registry import get_config, list_configs, PlatformConfig
from app.services.ingest.company_enrichment import enrich_company_desc
from app.services.ingest.dedup import build_insert_row, batch_dedup_filter
from app.services.ingest.remote_push import batch_push_to_remote
class IngestService:
"""统一数据入库服务"""
def __init__(self, clickhouse_client: AsyncClient):
self.client = clickhouse_client
async def store_batch(
self,
platform: str,
channel: str,
data_type: str,
data_list: List[Dict[str, Any]],
check_duplicate: bool = True,
) -> Dict[str, Any]:
results: Dict[str, Any] = {
"total": len(data_list), "success": 0, "failed": 0, "duplicate": 0, "errors": [],
}
if not data_list:
return results
config = get_config(platform, channel, data_type)
table = f"job_data.{config.table}"
# 准备所有行
all_columns: Optional[List[str]] = None
all_values: List[List[Any]] = []
push_data_list: List[Dict[str, Any]] = []
for i, data in enumerate(data_list):
try:
columns, values = build_insert_row(config, data, channel)
all_columns = all_columns or columns
all_values.append(values)
if config.push_mapper and data_type == "job":
push_item = config.push_mapper(data)
if push_item:
push_data_list.append(push_item)
except Exception as e:
results["failed"] += 1
results["errors"].append({"index": i, "error": str(e)})
if not all_values or all_columns is None:
return results
# 批量去重
if check_duplicate:
try:
filtered, ignored = await batch_dedup_filter(
self.client, config, data_list, all_columns, all_values,
)
all_values = filtered
results["duplicate"] = ignored
except Exception as e:
logger.error(f"批量去重失败: {e}")
# 批量插入
if all_values:
try:
await self.client.insert(table, all_values, column_names=all_columns)
results["success"] = len(all_values)
except Exception as e:
logger.error(f"批量插入失败: {e}")
results["failed"] += len(all_values)
results["errors"].append({"error": f"批量插入失败: {e}"})
# 异步远程推送(不影响主结果)
if push_data_list:
# 补全 company_desc优先 MySQLfallback 平台 API
try:
await enrich_company_desc(platform, push_data_list)
except Exception as e:
logger.warning(f"company_desc 补全异常: {e}")
try:
await batch_push_to_remote(push_data_list)
except Exception as e:
logger.warning(f"批量推送失败: {e}")
return results
async def store_single(
self,
platform: str,
channel: str,
data_type: str,
data: Dict[str, Any],
check_duplicate: bool = True,
) -> Dict[str, Any]:
return await self.store_batch(platform, channel, data_type, [data], check_duplicate)
async def query_data(
self,
platform: str,
channel: str,
data_type: str,
limit: int = 100,
offset: int = 0,
) -> Dict[str, Any]:
config = get_config(platform, channel, data_type)
table = f"job_data.{config.table}"
try:
count_result = await self.client.query(f"SELECT count() FROM {table}")
total = count_result.result_rows[0][0] if count_result.result_rows else 0
query = f"SELECT * FROM {table} ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}"
result = await self.client.query(query)
data_rows = []
for row in result.result_rows:
item = dict(zip(result.column_names, row))
if "json_data" in item and isinstance(item["json_data"], str):
try:
parsed = json.loads(item["json_data"])
if isinstance(parsed, dict):
item.update(parsed)
except Exception:
pass
data_rows.append(item)
return {"success": True, "data": data_rows, "count": total, "table_name": config.table}
except Exception as e:
logger.error(f"查询失败: {e}")
return {"success": False, "message": str(e), "error": str(e)}
@staticmethod
def get_registry_info() -> Dict[str, Any]:
configs = list_configs()
platforms = sorted({c.platform for c in configs})
channels = sorted({c.channel for c in configs})
data_types = sorted({c.data_type for c in configs})
entries = [
{
"platform": c.platform,
"channel": c.channel,
"data_type": c.data_type,
"table": c.table,
"dedup_columns": c.dedup_columns,
}
for c in configs
]
return {
"platforms": platforms,
"channels": channels,
"data_types": data_types,
"entries": entries,
}