- 新增 company_enrichment.py: job 入库时自动补全 company_desc (优先查 MySQL,fallback 调平台 API 获取并入库) - Boss 爬虫: 搜索列表后逐条调 batch 详情接口拿完整数据 (jobBaseInfoVO/brandComInfoVO),每条获取后立即上报 - Boss push_mapper: 兼容新旧两种 API 格式(扁平/嵌套VO) - Boss token: 启动时自动从后端 API 读取数据库中的 mpt/wt2 - Boss client: header 值 strip 防止空格导致请求失败 - qcwy URL: 用 jobId/coId 拼接 jobs.51job.com 格式 - 三个平台 max_pages 默认改为 100
161 lines
5.6 KiB
Python
161 lines
5.6 KiB
Python
import json
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
from clickhouse_connect.driver import AsyncClient
|
||
|
||
from app.log import logger
|
||
from app.services.ingest.registry import get_config, list_configs, PlatformConfig
|
||
from app.services.ingest.company_enrichment import enrich_company_desc
|
||
from app.services.ingest.dedup import build_insert_row, batch_dedup_filter
|
||
from app.services.ingest.remote_push import batch_push_to_remote
|
||
|
||
|
||
class IngestService:
|
||
"""统一数据入库服务"""
|
||
|
||
def __init__(self, clickhouse_client: AsyncClient):
|
||
self.client = clickhouse_client
|
||
|
||
async def store_batch(
|
||
self,
|
||
platform: str,
|
||
channel: str,
|
||
data_type: str,
|
||
data_list: List[Dict[str, Any]],
|
||
check_duplicate: bool = True,
|
||
) -> Dict[str, Any]:
|
||
results: Dict[str, Any] = {
|
||
"total": len(data_list), "success": 0, "failed": 0, "duplicate": 0, "errors": [],
|
||
}
|
||
if not data_list:
|
||
return results
|
||
|
||
config = get_config(platform, channel, data_type)
|
||
table = f"job_data.{config.table}"
|
||
|
||
# 准备所有行
|
||
all_columns: Optional[List[str]] = None
|
||
all_values: List[List[Any]] = []
|
||
push_data_list: List[Dict[str, Any]] = []
|
||
|
||
for i, data in enumerate(data_list):
|
||
try:
|
||
columns, values = build_insert_row(config, data, channel)
|
||
all_columns = all_columns or columns
|
||
all_values.append(values)
|
||
|
||
if config.push_mapper and data_type == "job":
|
||
push_item = config.push_mapper(data)
|
||
if push_item:
|
||
push_data_list.append(push_item)
|
||
except Exception as e:
|
||
results["failed"] += 1
|
||
results["errors"].append({"index": i, "error": str(e)})
|
||
|
||
if not all_values or all_columns is None:
|
||
return results
|
||
|
||
# 批量去重
|
||
if check_duplicate:
|
||
try:
|
||
filtered, ignored = await batch_dedup_filter(
|
||
self.client, config, data_list, all_columns, all_values,
|
||
)
|
||
all_values = filtered
|
||
results["duplicate"] = ignored
|
||
except Exception as e:
|
||
logger.error(f"批量去重失败: {e}")
|
||
|
||
# 批量插入
|
||
if all_values:
|
||
try:
|
||
await self.client.insert(table, all_values, column_names=all_columns)
|
||
results["success"] = len(all_values)
|
||
except Exception as e:
|
||
logger.error(f"批量插入失败: {e}")
|
||
results["failed"] += len(all_values)
|
||
results["errors"].append({"error": f"批量插入失败: {e}"})
|
||
|
||
# 异步远程推送(不影响主结果)
|
||
if push_data_list:
|
||
# 补全 company_desc(优先 MySQL,fallback 平台 API)
|
||
try:
|
||
await enrich_company_desc(platform, push_data_list)
|
||
except Exception as e:
|
||
logger.warning(f"company_desc 补全异常: {e}")
|
||
|
||
try:
|
||
await batch_push_to_remote(push_data_list)
|
||
except Exception as e:
|
||
logger.warning(f"批量推送失败: {e}")
|
||
|
||
return results
|
||
|
||
async def store_single(
|
||
self,
|
||
platform: str,
|
||
channel: str,
|
||
data_type: str,
|
||
data: Dict[str, Any],
|
||
check_duplicate: bool = True,
|
||
) -> Dict[str, Any]:
|
||
return await self.store_batch(platform, channel, data_type, [data], check_duplicate)
|
||
|
||
async def query_data(
|
||
self,
|
||
platform: str,
|
||
channel: str,
|
||
data_type: str,
|
||
limit: int = 100,
|
||
offset: int = 0,
|
||
) -> Dict[str, Any]:
|
||
config = get_config(platform, channel, data_type)
|
||
table = f"job_data.{config.table}"
|
||
|
||
try:
|
||
count_result = await self.client.query(f"SELECT count() FROM {table}")
|
||
total = count_result.result_rows[0][0] if count_result.result_rows else 0
|
||
|
||
query = f"SELECT * FROM {table} ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}"
|
||
result = await self.client.query(query)
|
||
|
||
data_rows = []
|
||
for row in result.result_rows:
|
||
item = dict(zip(result.column_names, row))
|
||
if "json_data" in item and isinstance(item["json_data"], str):
|
||
try:
|
||
parsed = json.loads(item["json_data"])
|
||
if isinstance(parsed, dict):
|
||
item.update(parsed)
|
||
except Exception:
|
||
pass
|
||
data_rows.append(item)
|
||
|
||
return {"success": True, "data": data_rows, "count": total, "table_name": config.table}
|
||
except Exception as e:
|
||
logger.error(f"查询失败: {e}")
|
||
return {"success": False, "message": str(e), "error": str(e)}
|
||
|
||
@staticmethod
|
||
def get_registry_info() -> Dict[str, Any]:
|
||
configs = list_configs()
|
||
platforms = sorted({c.platform for c in configs})
|
||
channels = sorted({c.channel for c in configs})
|
||
data_types = sorted({c.data_type for c in configs})
|
||
entries = [
|
||
{
|
||
"platform": c.platform,
|
||
"channel": c.channel,
|
||
"data_type": c.data_type,
|
||
"table": c.table,
|
||
"dedup_columns": c.dedup_columns,
|
||
}
|
||
for c in configs
|
||
]
|
||
return {
|
||
"platforms": platforms,
|
||
"channels": channels,
|
||
"data_types": data_types,
|
||
"entries": entries,
|
||
}
|