JobData/app/services/company_jobs_sync.py
win 3d202c3486 feat(05): data pipeline optimization (DATA-01, DATA-04)
Plan 01 - DATA-01: 30-day window dedup fix:
- dedup.py: both single-field and double-field SQL queries now include
  AND created_at > now() - INTERVAL 30 DAY
- tests/ingest/test_dedup.py: 6 mock tests validating 30-day window

Plan 02 - DATA-04: company vs search job channel separation:
- schemas/ingest.py: ChannelType.COMPANY = 'company'
- configs/boss.py: register channel='company' config
- configs/qcwy.py: register channel='company' config
- configs/zhilian.py: register channel='company' config
- company_jobs_sync.py: store_batch(..., 'mini', ...) → (..., 'company', ...)

DATA-02: confirmed already complete (job.py has /data/batch-async endpoint)
DATA-03: confirmed already complete (company_cleaner.py full pipeline)

Full regression: 112 passed (106 existing + 6 new)
2026-03-21 19:50:06 +08:00

134 lines
4.9 KiB
Python

import asyncio
import time
from typing import Any, Dict, List, Optional
from loguru import logger
from app.core.clickhouse import clickhouse_manager
from app.models.token import BossToken
from app.services.crawler.boss import BossService
from app.services.crawler.qcwy import QcwyService
from app.services.crawler.zhilian import ZhilianService
from app.services.ingest import IngestService
def _qcwy_extract_items(resp: dict) -> list:
if not isinstance(resp, dict):
return []
rb = resp.get("resultbody") or resp.get("resultBody")
if isinstance(rb, dict):
job_node = rb.get("job")
if isinstance(job_node, dict) and isinstance(job_node.get("items"), list):
return job_node["items"]
for key in ("items", "list", "jobs", "jobList"):
val = resp.get(key)
if isinstance(val, list):
return val
return []
class CompanyJobsSyncService:
_TOKEN_REFRESH_INTERVAL = 3600
def __init__(self):
self.boss_service = BossService()
self.qcwy_service = QcwyService()
self.zhilian_service = ZhilianService()
self.data_router: Optional[IngestService] = None
self._boss_token_loaded = False
self._token_loaded_at: float = 0
def set_proxy(self, proxy: Optional[str]) -> None:
self.boss_service.set_proxy(proxy)
self.qcwy_service.set_proxy(proxy)
self.zhilian_service.set_proxy(proxy)
async def get_data_router(self) -> IngestService:
if not self.data_router:
client = await clickhouse_manager.get_client()
self.data_router = IngestService(client)
return self.data_router
async def _ensure_boss_token_loaded(self) -> None:
now = time.time()
if (
self._boss_token_loaded
and self.boss_service.login_data.get("mpt")
and now - self._token_loaded_at < self._TOKEN_REFRESH_INTERVAL
):
return
token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first()
if not token_obj:
logger.warning("BossToken not found or inactive in CompanyJobsSyncService")
return
self.boss_service.set_login_data(token_obj.mpt or "", "")
self._boss_token_loaded = True
self._token_loaded_at = now
async def sync_company_jobs(self, source: str, company_id: str) -> Dict[str, Any]:
router = await self.get_data_router()
if source == "boss":
await self._ensure_boss_token_loaded()
data = await asyncio.to_thread(self.boss_service.get_company_jobs_by_id, company_id)
jobs = self._extract_boss_jobs(data)
elif source == "qcwy":
data = await asyncio.to_thread(self.qcwy_service.get_company_jobs_by_id, company_id)
jobs = self._extract_qcwy_jobs(data)
elif source == "zhilian":
data = await asyncio.to_thread(self.zhilian_service.get_company_jobs_by_id, company_id)
jobs = self._extract_zhilian_jobs(data)
else:
raise ValueError(f"unsupported source: {source}")
if not jobs:
return {
"success": True,
"source": source,
"company_id": company_id,
"jobs_fetched": 0,
"stored_success": 0,
"duplicate": 0,
"failed": 0,
"original_data": data,
}
store_result = await router.store_batch(source, "company", "job", jobs)
return {
"success": True,
"source": source,
"company_id": company_id,
"jobs_fetched": len(jobs),
"stored_success": store_result.get("success", 0),
"duplicate": store_result.get("duplicate", 0),
"failed": store_result.get("failed", 0),
"errors": store_result.get("errors", []),
"original_data": data,
}
@staticmethod
def _extract_boss_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not isinstance(data, dict):
return []
zp_data = data.get("zpData") or {}
if isinstance(zp_data.get("jobList"), list):
return zp_data.get("jobList") or []
if isinstance(zp_data.get("list"), list):
return zp_data.get("list") or []
return []
@staticmethod
def _extract_qcwy_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
jobs_list = _qcwy_extract_items(data or {})
return jobs_list if isinstance(jobs_list, list) else []
@staticmethod
def _extract_zhilian_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not isinstance(data, dict):
return []
jobs = data.get("list")
if not isinstance(jobs, list):
data_field = data.get("data") or {}
jobs = data_field.get("list") or []
return jobs if isinstance(jobs, list) else []