import asyncio import re import json import httpx from typing import List, Dict, Any, Optional from loguru import logger from app.core.clickhouse import clickhouse_manager from app.services.crawler.qcwy import QcwyService from app.services.ingest import IngestService from app.services.ingest.remote_push import push_to_remote from app.services.ingest.configs.qcwy import _build_qcwy_push from app.settings.config import settings # 提取 jobId 的正则表达式 JOB_ID_REGEX = re.compile(r'/(\d+)\.html') class LinkRecleaner: def __init__(self): self.qcwy_service = QcwyService() self.ingest_service = None self.semaphore = asyncio.Semaphore(50) # 限制并发 async def init(self): ch_client = await clickhouse_manager.get_client() self.ingest_service = IngestService(ch_client) async def get_job_id_from_url(self, url: str) -> Optional[str]: match = JOB_ID_REGEX.search(url) return match.group(1) if match else None async def fetch_from_clickhouse(self, job_id: str) -> Optional[Dict[str, Any]]: """从 ClickHouse 查询缓存""" query = f"SELECT json_data FROM job_data.qcwy_job WHERE job_id = '{job_id}' LIMIT 1" try: result = await clickhouse_manager.execute(query) if result.result_rows: return json.loads(result.result_rows[0][0]) except Exception as e: logger.error(f"ClickHouse query error for {job_id}: {e}") return None async def process_link(self, url: str) -> bool: """返回是否处理并上报成功""" async with self.semaphore: job_id = await self.get_job_id_from_url(url) if not job_id: logger.warning(f"Invalid URL: {url}") return False # 1. 尝试从 ClickHouse 获取 data = await self.fetch_from_clickhouse(job_id) source = "ClickHouse" # 2. 如果库里没有,则实时抓取 if not data: logger.info(f"Cache miss for {job_id}, crawling...") try: data = self.qcwy_service.get_job_detail(job_id) source = "Crawler" if data: # 存入数据库供下次使用 await self.ingest_service.store_single( "qcwy", "mini", "job", data, check_duplicate=True ) except Exception as e: logger.error(f"Crawl failed for {job_id}: {e}") return False if not data: logger.warning(f"No data found for {job_id}") return False # 3. 补全公司信息 detail = data.get("detailJobInfo") if isinstance(data.get("detailJobInfo"), dict) else data co_id = detail.get("coId") if co_id and not (detail.get("company_desc") or detail.get("coDescription")): try: logger.info(f"Filling company info for coId {co_id}...") co_info = self.qcwy_service.get_company_info(str(co_id)) if co_info: detail["company_desc"] = (co_info.get("coinfo", {}) or {}).get("coinfo", "") detail["companyHref"] = (co_info.get("share", {}) or {}).get("weixinshareurl", "") except Exception as e: logger.error(f"Failed to fill company info for {co_id}: {e}") # 4. 准备推送数据 try: remote_data = _build_qcwy_push(data) if remote_data: # 5. 发送到第三方 success = await push_to_remote(remote_data) status = "Success" if success else "Failed" logger.info(f"[{source}] Push {job_id}: {status}") return success else: logger.warning(f"Failed to prepare push data for {job_id}") return False except Exception as e: logger.error(f"Push error for {job_id}: {e}") return False async def run(self, file_path: str): await self.init() with open(file_path, 'r') as f: lines = f.readlines() # 初始 URL 列表 all_urls = [line.strip() for line in lines if line.strip() and line.strip() != 'url'] successful_urls = set() logger.info(f"Starting to process {len(all_urls)} links from {file_path}") # 分批处理并实时回写文件 batch_size = 50 for i in range(0, len(all_urls), batch_size): batch_urls = all_urls[i:i+batch_size] tasks = [self.process_link(url) for url in batch_urls] results = await asyncio.gather(*tasks) # 记录成功的 URL for url, success in zip(batch_urls, results): if success: successful_urls.add(url) # 定时/分批回写文件 (每完成一个 batch 回写一次,防止中断) remaining_urls = [url for url in all_urls if url not in successful_urls] with open(file_path, 'w') as f: f.write('url\n') for url in remaining_urls: f.write(f"{url}\n") logger.info(f"Batch {i // batch_size + 1} done. {len(remaining_urls)} remaining.") async def main(): recleaner = LinkRecleaner() input_file = "jobs_spider/qcwy.txt" await recleaner.run(input_file) if __name__ == "__main__": asyncio.run(main())