145 lines
5.7 KiB
Python
145 lines
5.7 KiB
Python
import asyncio
|
|
import re
|
|
import json
|
|
import httpx
|
|
from typing import List, Dict, Any, Optional
|
|
from loguru import logger
|
|
|
|
from app.core.clickhouse import clickhouse_manager
|
|
from app.services.crawler.qcwy import QcwyService
|
|
from app.services.job import DataRouterService, DataType, PlatformType
|
|
from app.settings.config import settings
|
|
|
|
# 提取 jobId 的正则表达式
|
|
JOB_ID_REGEX = re.compile(r'/(\d+)\.html')
|
|
|
|
class LinkRecleaner:
|
|
def __init__(self):
|
|
self.qcwy_service = QcwyService()
|
|
self.data_router = None
|
|
self.semaphore = asyncio.Semaphore(50) # 限制并发
|
|
|
|
async def init(self):
|
|
ch_client = await clickhouse_manager.get_client()
|
|
self.data_router = DataRouterService(ch_client)
|
|
|
|
async def get_job_id_from_url(self, url: str) -> Optional[str]:
|
|
match = JOB_ID_REGEX.search(url)
|
|
return match.group(1) if match else None
|
|
|
|
async def fetch_from_clickhouse(self, job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""从 ClickHouse 查询缓存"""
|
|
query = f"SELECT json_data FROM job_data.qcwy_job WHERE job_id = '{job_id}' LIMIT 1"
|
|
try:
|
|
result = await clickhouse_manager.execute(query)
|
|
if result.result_rows:
|
|
return json.loads(result.result_rows[0][0])
|
|
except Exception as e:
|
|
logger.error(f"ClickHouse query error for {job_id}: {e}")
|
|
return None
|
|
|
|
async def process_link(self, url: str) -> bool:
|
|
"""返回是否处理并上报成功"""
|
|
async with self.semaphore:
|
|
job_id = await self.get_job_id_from_url(url)
|
|
if not job_id:
|
|
logger.warning(f"Invalid URL: {url}")
|
|
return False
|
|
|
|
# 1. 尝试从 ClickHouse 获取
|
|
data = await self.fetch_from_clickhouse(job_id)
|
|
source = "ClickHouse"
|
|
|
|
# 2. 如果库里没有,则实时抓取
|
|
if not data:
|
|
logger.info(f"Cache miss for {job_id}, crawling...")
|
|
try:
|
|
data = self.qcwy_service.get_job_detail(job_id)
|
|
source = "Crawler"
|
|
if data:
|
|
# 存入数据库供下次使用
|
|
await self.data_router.store_data(
|
|
data, DataType.JOB, PlatformType.QCWY, check_duplicate=True
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Crawl failed for {job_id}: {e}")
|
|
return False
|
|
|
|
if not data:
|
|
logger.warning(f"No data found for {job_id}")
|
|
return False
|
|
|
|
# 3. 补全公司信息
|
|
detail = data.get("detailJobInfo") if isinstance(data.get("detailJobInfo"), dict) else data
|
|
co_id = detail.get("coId")
|
|
if co_id and not (detail.get("company_desc") or detail.get("coDescription")):
|
|
try:
|
|
logger.info(f"Filling company info for coId {co_id}...")
|
|
co_info = self.qcwy_service.get_company_info(str(co_id))
|
|
if co_info:
|
|
detail["company_desc"] = (co_info.get("coinfo", {}) or {}).get("coinfo", "")
|
|
detail["companyHref"] = (co_info.get("share", {}) or {}).get("weixinshareurl", "")
|
|
except Exception as e:
|
|
logger.error(f"Failed to fill company info for {co_id}: {e}")
|
|
|
|
# 4. 准备推送数据
|
|
try:
|
|
remote_data = await self.data_router._prepare_remote_push_data(
|
|
data, DataType.JOB, PlatformType.QCWY
|
|
)
|
|
|
|
if remote_data:
|
|
# 5. 发送到第三方
|
|
success = await self.data_router.send_to_remote_server(remote_data)
|
|
status = "✅ Success" if success else "❌ Failed"
|
|
logger.info(f"[{source}] Push {job_id}: {status}")
|
|
return success
|
|
else:
|
|
logger.warning(f"Failed to prepare push data for {job_id}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Push error for {job_id}: {e}")
|
|
return False
|
|
|
|
async def run(self, file_path: str):
|
|
await self.init()
|
|
|
|
with open(file_path, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
# 初始 URL 列表
|
|
all_urls = [line.strip() for line in lines if line.strip() and line.strip() != 'url']
|
|
successful_urls = set()
|
|
|
|
logger.info(f"Starting to process {len(all_urls)} links from {file_path}")
|
|
|
|
# 分批处理并实时回写文件
|
|
batch_size = 50
|
|
for i in range(0, len(all_urls), batch_size):
|
|
batch_urls = all_urls[i:i+batch_size]
|
|
tasks = [self.process_link(url) for url in batch_urls]
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
# 记录成功的 URL
|
|
for url, success in zip(batch_urls, results):
|
|
if success:
|
|
successful_urls.add(url)
|
|
|
|
# 定时/分批回写文件 (每完成一个 batch 回写一次,防止中断)
|
|
remaining_urls = [url for url in all_urls if url not in successful_urls]
|
|
with open(file_path, 'w') as f:
|
|
f.write('url\n')
|
|
for url in remaining_urls:
|
|
f.write(f"{url}\n")
|
|
|
|
logger.info(f"Batch {i // batch_size + 1} done. {len(remaining_urls)} remaining.")
|
|
|
|
async def main():
|
|
recleaner = LinkRecleaner()
|
|
input_file = "jobs_spider/qcwy.txt"
|
|
await recleaner.run(input_file)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|