JobData/app/services/ingest_service.py

108 lines
5.0 KiB
Python

import json
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
from clickhouse_connect.driver import AsyncClient
class IngestService:
def __init__(self, client: AsyncClient):
self.client = client
def _table_name(self, platform: str, data_type: str) -> str:
return f"job_data.{platform}_{data_type}"
def _build_row(self, platform: str, data_type: str, data: Dict[str, Any]) -> Tuple[List[str], List[Any]]:
now = datetime.now()
columns = ["id", "json_data", "created_at", "updated_at"]
values = [0, json.dumps(data, ensure_ascii=False), now, now]
if platform == "boss" and data_type == "job":
job_base = data.get("jobBaseInfoVO", {})
columns += ["job_id"]
values += [str(job_base.get("jobId", ""))]
elif platform == "qcwy" and data_type == "job":
columns += ["job_id", "update_date_time"]
values += [str(data.get("jobId", "")), str(data.get("updateDateTime", ""))]
elif platform == "zhilian" and data_type == "job":
columns += ["number", "first_publish_time"]
values += [str(data.get("number", "")), str(data.get("firstPublishTime", ""))]
elif data_type == "company":
name = None
if platform == "boss":
name = data.get("brandComInfoVO", {}).get("brandName") or data.get("name")
elif platform == "qcwy":
name = data.get("fullCompanyName") or data.get("companyName")
elif platform == "zhilian":
name = data.get("companyName") or data.get("name")
columns += ["company_name"]
values += [str(name or "")]
return columns, values
def _dup_conditions(self, platform: str, data_type: str, data: Dict[str, Any]) -> Optional[Tuple[str, List[Any]]]:
if platform == "boss" and data_type == "job":
job_base = data.get("jobBaseInfoVO", {})
job_id = job_base.get("jobId")
if not job_id:
return None
return "job_id = %s", [str(job_id)]
if platform == "qcwy" and data_type == "job":
job_id = data.get("jobId")
update_dt = data.get("updateDateTime")
if not job_id or not update_dt:
return None
return "job_id = %s AND update_date_time = %s", [str(job_id), str(update_dt)]
if platform == "zhilian" and data_type == "job":
number = data.get("number")
fpt = data.get("firstPublishTime")
if not number or not fpt:
return None
return "number = %s AND first_publish_time = %s", [str(number), str(fpt)]
if data_type == "company":
name = None
if platform == "boss":
name = data.get("brandComInfoVO", {}).get("brandName") or data.get("name")
elif platform == "qcwy":
name = data.get("fullCompanyName") or data.get("companyName")
elif platform == "zhilian":
name = data.get("companyName") or data.get("name")
if not name:
return None
return "company_name = %s", [str(name)]
return None
async def store_single(self, platform: str, data_type: str, data: Dict[str, Any], check_duplicate: bool = True) -> Dict[str, int]:
table = self._table_name(platform, data_type)
if check_duplicate:
cond = self._dup_conditions(platform, data_type, data)
if cond:
where_sql, params = cond
q = f"SELECT 1 FROM {table} WHERE {where_sql} LIMIT 1"
r = await self.client.query(q, params)
if r.result_rows:
return {"inserted": 0, "ignored": 1}
cols, vals = self._build_row(platform, data_type, data)
await self.client.insert(table, [vals], column_names=cols)
return {"inserted": 1, "ignored": 0}
async def store_batch(self, platform: str, data_type: str, data_list: List[Dict[str, Any]], check_duplicate: bool = True) -> Dict[str, int]:
table = self._table_name(platform, data_type)
if not data_list:
return {"inserted": 0, "ignored": 0}
rows: List[List[Any]] = []
columns: Optional[List[str]] = None
ignored = 0
for d in data_list:
if check_duplicate:
cond = self._dup_conditions(platform, data_type, d)
if cond:
where_sql, params = cond
q = f"SELECT 1 FROM {table} WHERE {where_sql} LIMIT 1"
r = await self.client.query(q, params)
if r.result_rows:
ignored += 1
continue
cols, vals = self._build_row(platform, data_type, d)
columns = columns or cols
rows.append(vals)
if rows:
await self.client.insert(table, rows, column_names=columns)
return {"inserted": len(rows), "ignored": ignored}