JobData/app/services/cleaning.py
2026-03-22 23:22:30 +08:00

328 lines
14 KiB
Python

import asyncio
import csv
import io
import re
import time
from typing import List, Dict, Any, Union, Optional
from fastapi import UploadFile
from loguru import logger
from app.services.crawler.boss import BossService
from app.services.crawler.qcwy import QcwyService
from app.services.crawler.zhilian import ZhilianService
from app.services.company_jobs_sync import CompanyJobsSyncService
from app.services.company_storage import company_storage
from app.services.ingest import IngestService
from app.core.clickhouse import clickhouse_manager
from app.models.token import BossToken
class CleaningService:
_TOKEN_REFRESH_INTERVAL = 3600 # 1小时刷新一次
def __init__(self):
self.boss_service = BossService()
self.qcwy_service = QcwyService()
self.zhilian_service = ZhilianService()
self.company_jobs_sync = CompanyJobsSyncService()
self.data_router: Optional[IngestService] = None
self._boss_token_loaded = False
self._token_loaded_at: float = 0
def _apply_proxy(self, proxy: Optional[str]) -> None:
self.boss_service.set_proxy(proxy)
self.qcwy_service.set_proxy(proxy)
self.zhilian_service.set_proxy(proxy)
self.company_jobs_sync.set_proxy(proxy)
async def _ensure_boss_token_loaded(self) -> None:
now = time.time()
if (self._boss_token_loaded
and self.boss_service.login_data.get("mpt")
and now - self._token_loaded_at < self._TOKEN_REFRESH_INTERVAL):
return
token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first()
if not token_obj:
logger.warning("BossToken not found or inactive")
return
self.boss_service.set_login_data(token_obj.mpt or "", "")
self._boss_token_loaded = True
self._token_loaded_at = now
async def get_data_router(self) -> IngestService:
if not self.data_router:
client = await clickhouse_manager.get_client()
self.data_router = IngestService(client)
return self.data_router
async def parse_file(self, file: UploadFile) -> List[str]:
content = await file.read()
filename = file.filename
targets = []
if filename.endswith('.csv'):
text = content.decode('utf-8')
if text.startswith('\uFEFF'):
text = text[1:]
reader = csv.reader(io.StringIO(text))
for row in reader:
if row:
targets.append(row[0].strip())
else:
text = content.decode('utf-8')
targets = [line.strip() for line in text.splitlines() if line.strip()]
return [t for t in targets if t]
async def _store_company_record(
self,
source: str,
data: Dict[str, Any],
company_id: str,
) -> Dict[str, Any]:
result = await company_storage.upsert_company(source, data, company_id=company_id)
result["duplicate"] = False
result["remote_sent"] = False
result["message"] = "公司数据已写入MySQL"
result["original_data"] = data
return result
async def process_single_item(self, target: str, clean_type: str = "auto", platform: str = "auto", proxy: Optional[str] = None) -> Dict[str, Any]:
try:
await self._ensure_boss_token_loaded()
self._apply_proxy(proxy)
result = None
if clean_type == "auto":
result = await self.clean_target_auto(target)
elif clean_type == "clean_url":
if platform == "auto":
result = await self.clean_target_auto(target)
elif platform == "boss":
result = await self._process_boss_url(target)
elif platform == "qcwy":
result = await self._process_qcwy_url(target)
elif platform == "zhilian":
result = await self._process_zhilian_url(target)
elif clean_type == "job_id":
result = await self.clean_by_job_id(target, platform)
elif clean_type == "company_name":
result = await self.clean_by_company_name(target, platform)
elif clean_type == "company_id":
result = await self.clean_by_company_id(target, platform)
elif clean_type == "company_jobs":
if platform == "boss":
result = await self.clean_boss_company_jobs(target)
elif platform == "qcwy":
result = await self.clean_qcwy_company_jobs(target)
elif platform == "zhilian":
result = await self.clean_zhilian_company_jobs(target)
if not result:
return {
"success": False,
"target": target,
"error": "No data found or operation failed",
"storage_status": "failed",
"remote_sent": False
}
if isinstance(result, bool):
return {
"success": result,
"target": target,
"error": None if result else "Operation failed",
"storage_status": "unknown",
"remote_sent": False
}
return {
"success": result.get("success", False),
"target": target,
"error": result.get("message") if not result.get("success") else None,
"storage_status": "duplicate" if result.get("duplicate") else "saved",
"remote_sent": result.get("remote_sent", False),
"data_summary": result.get("data_summary"),
"jobs_summary": result.get("jobs_summary"),
"original_data": result.get("original_data")
}
except Exception as e:
logger.error(f"Error processing item {target}: {e}")
return {
"success": False,
"target": target,
"error": str(e),
"storage_status": "error",
"remote_sent": False
}
async def clean_target_auto(self, target: str) -> Union[bool, Dict[str, Any]]:
if "zhipin.com" in target:
return await self._process_boss_url(target)
elif "51job.com" in target:
return await self._process_qcwy_url(target)
elif "zhaopin.com" in target:
return await self._process_zhilian_url(target)
return await self._process_search_company(target)
async def clean_by_job_id(self, target: str, platform: str) -> Union[bool, Dict[str, Any]]:
router = await self.get_data_router()
data = None
result = None
if platform == "boss":
match = re.search(r'job_detail/([^.]+)\.html', target)
if match:
target = match.group(1)
elif platform == "qcwy":
match = re.search(r'/(\d+)\.html', target)
if match:
target = match.group(1)
elif platform == "zhilian":
match = re.search(r'jobs\.zhaopin\.com/(\w+)\.htm', target)
if match:
target = match.group(1)
if platform == "boss":
data = await asyncio.to_thread(self.boss_service.get_job_detail_by_id, target)
if data:
result = await router.store_single("boss", "mini", "job", data)
elif platform == "qcwy":
data = await asyncio.to_thread(self.qcwy_service.get_job_detail, target)
if data:
result = await router.store_single("qcwy", "mini", "job", data)
elif platform == "zhilian":
data = await asyncio.to_thread(self.zhilian_service.get_job_detail, target)
if data:
result = await router.store_single("zhilian", "mini", "job", data)
if result and isinstance(result, dict) and data:
result['original_data'] = data
return result
return False
async def clean_by_company_name(self, target: str, platform: str) -> Union[bool, Dict[str, Any]]:
router = await self.get_data_router()
if platform == "boss":
res = await asyncio.to_thread(self.boss_service.search_jobs, target)
if res and res.get('zpData') and res['zpData'].get('list'):
last_result = None
for job in res['zpData']['list']:
last_result = await router.store_single("boss", "mini", "job", job)
if last_result and isinstance(last_result, dict):
last_result['original_data'] = res
return last_result if last_result else False
elif platform == "qcwy":
res = await asyncio.to_thread(self.qcwy_service.search_jobs, target)
if res:
last_result = None
for job in res:
last_result = await router.store_single("qcwy", "mini", "job", job)
if last_result and isinstance(last_result, dict):
last_result['original_data'] = res
return last_result if last_result else False
elif platform == "zhilian":
res = await asyncio.to_thread(self.zhilian_service.search_company_jobs_by_name, target)
if res and isinstance(res, dict):
data = res.get("data") or {}
items = data.get("list") or []
if not isinstance(items, list):
items = []
last_result = None
for job in items:
last_result = await router.store_single("zhilian", "mini", "job", job)
if last_result and isinstance(last_result, dict):
last_result["original_data"] = res
return last_result if last_result else False
return False
async def clean_by_company_id(self, target: str, platform: str) -> Union[bool, Dict[str, Any]]:
data = None
result = None
if platform == "boss":
data = await asyncio.to_thread(self.boss_service.get_company_detail_by_id, target)
if data:
result = await self._store_company_record("boss", data, target)
result["jobs_summary"] = await self.company_jobs_sync.sync_company_jobs("boss", target)
elif platform == "qcwy":
company_id = target
match = re.match(r"^co(\d+)$", company_id)
if match:
company_id = match.group(1)
data = await asyncio.to_thread(self.qcwy_service.get_company_info, company_id)
if data:
result = await self._store_company_record("qcwy", data, company_id)
result["jobs_summary"] = await self.company_jobs_sync.sync_company_jobs("qcwy", company_id)
elif platform == "zhilian":
data = await asyncio.to_thread(self.zhilian_service.get_company_detail, target)
if data:
result = await self._store_company_record("zhilian", data, target)
result["jobs_summary"] = await self.company_jobs_sync.sync_company_jobs("zhilian", target)
if result and isinstance(result, dict) and data:
result['original_data'] = data
return result
return False
async def clean_boss_company_jobs(self, target: str) -> Union[bool, Dict[str, Any]]:
company_id = target
match = re.search(r'gongsi/([^.]+)\.html', target)
if match:
company_id = match.group(1)
result = await self.company_jobs_sync.sync_company_jobs("boss", company_id)
return result if result.get("jobs_fetched", 0) > 0 else False
async def clean_qcwy_company_jobs(self, target: str) -> Union[bool, Dict[str, Any]]:
company_id = target
match = re.match(r'^co(\d+)$', company_id)
if match:
company_id = match.group(1)
result = await self.company_jobs_sync.sync_company_jobs("qcwy", company_id)
return result if result.get("jobs_fetched", 0) > 0 else False
async def clean_zhilian_company_jobs(self, target: str) -> Union[bool, Dict[str, Any]]:
company_id = target
result = await self.company_jobs_sync.sync_company_jobs("zhilian", company_id)
return result if result.get("jobs_fetched", 0) > 0 else False
async def _process_boss_url(self, url: str) -> Union[bool, Dict[str, Any]]:
job_match = re.search(r'job_detail/([^.]+)\.html', url)
if job_match:
return await self.clean_by_job_id(job_match.group(1), "boss")
company_match = re.search(r'gongsi/([^.]+)\.html', url)
if company_match:
return await self.clean_by_company_id(company_match.group(1), "boss")
return await self.clean_by_job_id(url, "boss")
async def _process_qcwy_url(self, url: str) -> Union[bool, Dict[str, Any]]:
job_match = re.search(r'/(\d+)\.html', url)
if job_match:
return await self.clean_by_job_id(job_match.group(1), "qcwy")
company_match = re.search(r'co(\d+)', url, re.IGNORECASE)
if company_match:
return await self.clean_by_company_id(company_match.group(1), "qcwy")
return await self.clean_by_job_id(url, "qcwy")
async def _process_zhilian_url(self, url: str) -> Union[bool, Dict[str, Any]]:
job_match = re.search(r'jobs\.zhaopin\.com/(\w+)\.htm', url)
if job_match:
return await self.clean_by_job_id(job_match.group(1), "zhilian")
company_match = re.search(r'/company/([A-Za-z0-9]+)', url)
if company_match:
return await self.clean_by_company_id(company_match.group(1), "zhilian")
return await self.clean_by_job_id(url, "zhilian")
async def _process_search_company(self, name: str) -> Union[bool, Dict[str, Any]]:
return await self.clean_by_company_name(name, "boss")