JobData/app/services/company_cleaner.py

646 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import random
from datetime import datetime
from typing import Any, Dict, List, Optional
from loguru import logger
from app.core.clickhouse import clickhouse_manager
from app.models.token import BossToken
from app.services.crawler.boss import BossService
from app.services.crawler.qcwy import QcwyService
from app.services.crawler.zhilian import ZhilianService
class CompanyCleaner:
def __init__(self):
self.boss_service = BossService()
self.qcwy_service = QcwyService()
self.zhilian_service = ZhilianService()
self._boss_token_loaded = False
def _apply_proxy(self, proxy: Optional[str]) -> None:
self.boss_service.set_proxy(proxy)
self.qcwy_service.set_proxy(proxy)
self.zhilian_service.set_proxy(proxy)
async def _ensure_boss_token_loaded(self) -> None:
if self._boss_token_loaded and self.boss_service.login_data.get("mpt"):
return
token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first()
if not token_obj:
logger.warning("BossToken not found or inactive in CompanyCleaner")
return
self.boss_service.set_login_data(token_obj.mpt or "", "")
self._boss_token_loaded = True
async def collect_pending_companies(self, limit: int = 1000, source: Optional[str] = None):
client = await clickhouse_manager.get_client()
logger.info(f"Starting to collect pending companies (limit={limit}, source={source or 'all'})...")
if source is None or source == "zhilian":
await self._collect_zhilian(client, limit)
if source is None or source == "qcwy":
await self._collect_qcwy(client, limit)
if source is None or source == "boss":
await self._collect_boss(client, limit)
logger.info("Finished collecting pending companies.")
async def _collect_zhilian(self, client, limit: int):
logger.info("Collecting Zhilian companies...")
# 优化先获取已存在的公司ID避免在子查询中读取json_data
# 使用PREWHERE提前过滤时间范围减少需要读取的数据量
# 检查90天内已处理的公司避免重复请求
days_back_existing = 90 # 查询最近90天的数据避免重复请求已处理过的公司
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'companyNumber') as cid
FROM job_data.zhilian_company
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'companyNumber') != ''
LIMIT 50000
"""
# 添加重试机制
existing_result = None
existing_cids = set() # 默认使用空集合
for attempt in range(3):
try:
logger.info(f"Querying existing Zhilian companies (attempt {attempt+1})...")
existing_result = await client.query(existing_companies_query)
existing_cids = {row[0] for row in existing_result.result_rows if row[0]}
break
except Exception as e:
error_str = str(e).lower()
if "memory" in error_str or "memory_limit" in error_str:
if attempt == 0:
days_back_existing = 1
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'companyNumber') as cid
FROM job_data.zhilian_company
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'companyNumber') != ''
LIMIT 5000
"""
logger.warning(f"Memory error, reducing time range to {days_back_existing} days")
elif attempt == 1:
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'companyNumber') as cid
FROM job_data.zhilian_company SAMPLE 0.1
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'companyNumber') != ''
LIMIT 2000
"""
logger.warning(f"Memory error persists, using SAMPLE 0.1")
else:
logger.error(f"Failed to query existing companies after {attempt+1} attempts: {e}")
logger.warning("Using empty set for existing_cids, continuing with collection...")
existing_cids = set()
break
else:
logger.error(f"Non-memory error while querying existing companies: {e}")
raise
pending_query = "SELECT DISTINCT company_id FROM job_data.pending_company WHERE source = 'zhilian'"
pending_result = await client.query(pending_query)
pending_cids = {row[0] for row in pending_result.result_rows if row[0]}
# 构建排除列表
exclude_cids = existing_cids | pending_cids
# 优化添加时间范围过滤只查询最近30天的数据减少扫描量
# 使用 PREWHERE 提前过滤时间范围,避免读取大量历史数据的 json_data
# 增加 LIMIT 以便在 Python 中过滤后仍有足够的数据
query = f"""
SELECT DISTINCT
JSONExtractString(json_data, 'companyNumber') as cid,
JSONExtractString(json_data, 'companyName') as cname
FROM job_data.zhilian_job
PREWHERE created_at > now() - INTERVAL 30 DAY
WHERE json_data != ''
AND JSONExtractString(json_data, 'companyNumber') != ''
LIMIT {limit * 2}
"""
logger.info(f"Executing SQL for Zhilian (limit={limit * 2}): {query[:500]}...")
result = await client.query(query)
if not result.result_rows:
return
# 在 Python 中过滤掉已存在的和待处理的
rows: List[Dict[str, Any]] = []
for cid, cname in result.result_rows:
if not cid or cid in exclude_cids:
continue
if len(rows) >= limit:
break
rows.append(
{
"source": "zhilian",
"company_id": cid,
"company_name": cname,
"status": "pending",
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
)
await self._insert_pending(client, rows)
logger.info(f"Added {len(rows)} Zhilian companies to pending.")
async def _collect_qcwy(self, client, limit: int):
logger.info("Collecting QCWY companies...")
# 优化先获取已存在的公司ID避免在子查询中读取json_data
# 使用PREWHERE提前过滤时间范围减少需要读取的数据量
# 检查90天内已处理的公司避免重复请求
days_back_existing = 90 # 查询最近90天的数据避免重复请求已处理过的公司
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'companyId') as cid
FROM job_data.qcwy_company
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'companyId') != ''
LIMIT 50000
"""
# 添加重试机制
existing_result = None
existing_cids = set() # 默认使用空集合
for attempt in range(3):
try:
logger.info(f"Querying existing QCWY companies (attempt {attempt+1})...")
existing_result = await client.query(existing_companies_query)
# 查询成功,提取结果
existing_cids = {row[0] for row in existing_result.result_rows if row[0]}
break
except Exception as e:
error_str = str(e).lower()
if "memory" in error_str or "memory_limit" in error_str:
if attempt == 0:
# 第一次失败:进一步减少时间范围
days_back_existing = 1
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'companyId') as cid
FROM job_data.qcwy_company
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'companyId') != ''
LIMIT 5000
"""
logger.warning(f"Memory error, reducing time range to {days_back_existing} days")
elif attempt == 1:
# 第二次失败:使用采样
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'companyId') as cid
FROM job_data.qcwy_company SAMPLE 0.1
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'companyId') != ''
LIMIT 2000
"""
logger.warning(f"Memory error persists, using SAMPLE 0.1")
else:
# 最后一次尝试也失败,使用空集合继续执行(避免阻塞整个流程)
logger.error(f"Failed to query existing companies after {attempt+1} attempts: {e}")
logger.warning("Using empty set for existing_cids, continuing with collection...")
existing_cids = set()
break
else:
# 其他错误直接抛出
logger.error(f"Non-memory error while querying existing companies: {e}")
raise
pending_query = "SELECT DISTINCT company_id FROM job_data.pending_company WHERE source = 'qcwy'"
pending_result = await client.query(pending_query)
pending_cids = {row[0] for row in pending_result.result_rows if row[0]}
# 构建排除列表
exclude_cids = existing_cids | pending_cids
# 优化策略:
# 1. 减少时间范围从30天减少到7天大幅减少扫描的数据量
# 2. 减少LIMIT从limit*2减少到更小的值减少内存占用
# 3. 使用更严格的PREWHERE条件先过滤时间再过滤空json_data和超大JSON
# 4. 限制JSON大小过滤掉过大的json_data可能包含大量嵌套数据
# 5. 分批查询如果limit较大分批处理每次查询更少的数据
days_back = 7 # 从30天减少到7天减少扫描量
# 注意不使用length(json_data)检查,因为它需要读取整个列来计算长度
query_limit = min(limit * 2, 100) # 限制最大查询数量,避免内存超限
# 分批查询策略如果limit较大分批处理
result = None
for attempt in range(3): # 最多尝试3次
try:
# 根据尝试次数调整参数
if attempt == 1:
# 第一次失败后减少时间范围到3天
days_back = 3
query_limit = min(query_limit, 50)
logger.warning(f"Retry {attempt}: Reducing time range to {days_back} days and limit to {query_limit}")
elif attempt == 2:
# 第二次失败后:使用采样
query = f"""
SELECT DISTINCT
JSONExtractString(json_data, 'coId') as cid,
JSONExtractString(json_data, 'companyName') as cname
FROM job_data.qcwy_job SAMPLE 0.1
PREWHERE created_at > now() - INTERVAL {days_back} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'coId') != ''
LIMIT {query_limit}
"""
logger.warning(f"Retry {attempt}: Using SAMPLE 0.1 to reduce memory usage")
result = await client.query(query)
break
# 正常查询或第一次重试
query = f"""
SELECT DISTINCT
JSONExtractString(json_data, 'coId') as cid,
JSONExtractString(json_data, 'companyName') as cname
FROM job_data.qcwy_job
PREWHERE created_at > now() - INTERVAL {days_back} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'coId') != ''
LIMIT {query_limit}
"""
logger.info(f"Executing SQL for QCWY (limit={query_limit}, days={days_back}, attempt={attempt+1}): {query[:400]}...")
result = await client.query(query)
break
except Exception as e:
error_str = str(e).lower()
# 如果查询失败(可能是内存超限),继续重试
if "memory" in error_str or "memory_limit" in error_str:
if attempt < 2:
logger.warning(f"Memory error on attempt {attempt+1}: {e}")
continue
else:
# 最后一次尝试也失败,抛出异常
logger.error(f"Query failed after {attempt+1} attempts: {e}")
raise
else:
# 其他错误直接抛出
logger.error(f"Query failed with non-memory error: {e}")
raise
if not result or not result.result_rows:
logger.info("No QCWY companies found in query result.")
return
# 在 Python 中过滤掉已存在的和待处理的
rows: List[Dict[str, Any]] = []
for cid, cname in result.result_rows:
if not cid or cid in exclude_cids:
continue
if len(rows) >= limit:
break
rows.append(
{
"source": "qcwy",
"company_id": cid,
"company_name": cname,
"status": "pending",
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
)
if rows:
await self._insert_pending(client, rows)
logger.info(f"Added {len(rows)} QCWY companies to pending.")
else:
logger.info("No new QCWY companies found after filtering.")
async def _collect_boss(self, client, limit: int):
logger.info("Collecting Boss companies...")
# 优化先获取已存在的公司ID避免在子查询中读取json_data
# 使用PREWHERE提前过滤时间范围减少需要读取的数据量
# 检查90天内已处理的公司避免重复请求
days_back_existing = 90 # 查询最近90天的数据避免重复请求已处理过的公司
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'brandId') as cid
FROM job_data.boss_company
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'brandId') != ''
LIMIT 50000
"""
# 添加重试机制
existing_result = None
existing_cids = set() # 默认使用空集合
for attempt in range(3):
try:
logger.info(f"Querying existing Boss companies (attempt {attempt+1})...")
existing_result = await client.query(existing_companies_query)
existing_cids = {row[0] for row in existing_result.result_rows if row[0]}
break
except Exception as e:
error_str = str(e).lower()
if "memory" in error_str or "memory_limit" in error_str:
if attempt == 0:
days_back_existing = 1
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'brandId') as cid
FROM job_data.boss_company
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'brandId') != ''
"""
logger.warning(f"Memory error, reducing time range to {days_back_existing} days")
elif attempt == 1:
existing_companies_query = f"""
SELECT DISTINCT JSONExtractString(json_data, 'brandId') as cid
FROM job_data.boss_company SAMPLE 0.1
PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY
AND json_data != ''
WHERE JSONExtractString(json_data, 'brandId') != ''
"""
logger.warning(f"Memory error persists, using SAMPLE 0.1")
else:
logger.error(f"Failed to query existing companies after {attempt+1} attempts: {e}")
logger.warning("Using empty set for existing_cids, continuing with collection...")
existing_cids = set()
break
else:
logger.error(f"Non-memory error while querying existing companies: {e}")
raise
pending_query = "SELECT DISTINCT company_id FROM job_data.pending_company WHERE source = 'boss'"
pending_result = await client.query(pending_query)
pending_cids = {row[0] for row in pending_result.result_rows if row[0]}
# 构建排除列表
exclude_cids = existing_cids | pending_cids
# 优化添加时间范围过滤只查询最近30天的数据减少扫描量
# 使用 PREWHERE 提前过滤时间范围,避免读取大量历史数据的 json_data
# 增加 LIMIT 以便在 Python 中过滤后仍有足够的数据
query = f"""
SELECT DISTINCT
JSONExtractString(json_data, 'brandId') as cid,
JSONExtractString(json_data, 'brandName') as cname
FROM job_data.boss_job
PREWHERE created_at > now() - INTERVAL 30 DAY
WHERE json_data != ''
AND JSONExtractString(json_data, 'brandId') != ''
LIMIT {limit * 2}
"""
logger.info(f"Executing SQL for Boss (limit={limit * 2}): {query[:500]}...")
result = await client.query(query)
if not result.result_rows:
return
# 在 Python 中过滤掉已存在的和待处理的
rows: List[Dict[str, Any]] = []
for cid, cname in result.result_rows:
if not cid or cid in exclude_cids:
continue
if len(rows) >= limit:
break
rows.append(
{
"source": "boss",
"company_id": cid,
"company_name": cname,
"status": "pending",
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
)
await self._insert_pending(client, rows)
logger.info(f"Added {len(rows)} Boss companies to pending.")
async def _insert_pending(self, client, rows: List[Dict[str, Any]]):
if not rows:
return
data: List[List[Any]] = []
for r in rows:
data.append(
[
r["source"],
r["company_id"],
r["company_name"],
r["status"],
"",
r["created_at"],
r["updated_at"],
1,
]
)
await client.insert(
"job_data.pending_company",
data,
column_names=[
"source",
"company_id",
"company_name",
"status",
"error_msg",
"created_at",
"updated_at",
"version",
],
)
async def process_single_company(
self,
source: str,
company_id: str,
proxy: Optional[str] = None,
max_delay_seconds: int = 5,
) -> Dict[str, Any]:
client = await clickhouse_manager.get_client()
if proxy:
self._apply_proxy(proxy)
delay = 0
if max_delay_seconds and max_delay_seconds > 0:
delay = random.randint(1, max_delay_seconds)
if delay > 0:
await asyncio.sleep(delay)
query = f"""
SELECT source, company_id, company_name, version
FROM job_data.pending_company
FINAL
WHERE source = '{source}' AND company_id = '{company_id}'
ORDER BY version DESC
LIMIT 1
"""
result = await client.query(query)
if result.result_rows:
source_value, cid, cname, version = result.result_rows[0]
else:
source_value = source
cid = company_id
cname = ""
version = 0
try:
success = await self._fetch_and_save(source_value, cid)
status = "done" if success else "failed"
error_msg = "" if success else "Fetch failed"
except Exception as e:
logger.error(f"Error processing {source_value} {cid}: {e}")
status = "failed"
error_msg = str(e)
await client.insert(
"job_data.pending_company",
[
[
source_value,
cid,
cname,
status,
error_msg.replace("'", "''"),
datetime.now(),
datetime.now(),
int(version) + 1,
]
],
column_names=[
"source",
"company_id",
"company_name",
"status",
"error_msg",
"created_at",
"updated_at",
"version",
],
)
return {
"success": status == "done",
"source": source_value,
"company_id": cid,
"company_name": cname,
"status": status,
"error_msg": error_msg,
"version": int(version) + 1,
}
async def process_pending_companies(
self,
limit: int = 100,
source: Optional[str] = None,
proxy: Optional[str] = None,
max_delay_seconds: int = 0,
):
client = await clickhouse_manager.get_client()
logger.info(f"Processing pending companies (limit={limit}, source={source or 'all'})...")
if proxy:
self._apply_proxy(proxy)
where_clause = "WHERE status = 'pending'"
if source:
where_clause += f" AND source = '{source}'"
query = f"""
SELECT source, company_id, company_name, version
FROM job_data.pending_company
FINAL
{where_clause}
ORDER BY created_at ASC
LIMIT {limit}
"""
result = await client.query(query)
if not result.result_rows:
logger.info("No pending companies to process.")
return
for source_value, cid, cname, version in result.result_rows:
logger.info(f"Processing {source_value} company: {cname} ({cid})")
try:
if max_delay_seconds and max_delay_seconds > 0:
delay = random.randint(1, max_delay_seconds)
if delay > 0:
await asyncio.sleep(delay)
success = await self._fetch_and_save(source_value, cid)
status = "done" if success else "failed"
error_msg = "" if success else "Fetch failed"
except Exception as e:
logger.error(f"Error processing {source_value} {cid}: {e}")
status = "failed"
error_msg = str(e)
await client.insert(
"job_data.pending_company",
[
[
source_value,
cid,
cname,
status,
error_msg.replace("'", "''"),
datetime.now(),
datetime.now(),
int(version) + 1,
]
],
column_names=[
"source",
"company_id",
"company_name",
"status",
"error_msg",
"created_at",
"updated_at",
"version",
],
)
async def _fetch_and_save(self, source: str, company_id: str) -> bool:
data: Optional[Dict[str, Any]] = None
target_table = ""
if source == "zhilian":
data = self.zhilian_service.get_company_detail(company_id)
target_table = "zhilian_company"
elif source == "qcwy":
data = self.qcwy_service.get_company_info(company_id)
target_table = "qcwy_company"
elif source == "boss":
await self._ensure_boss_token_loaded()
data = self.boss_service.get_company_detail_by_id(company_id)
target_table = "boss_company"
if not data:
logger.error(f"No data returned from source={source} company_id={company_id}")
return False
try:
logger.info(
f"Raw company data from source={source} company_id={company_id}: "
f"{json.dumps(data, ensure_ascii=False)[:2000]}"
)
except Exception as e:
logger.error(f"Failed to log raw company data for source={source} company_id={company_id}: {e}")
client = await clickhouse_manager.get_client()
name = ""
if source == "zhilian":
name = data.get("companyBase", {}).get("companyName", "")
elif source == "qcwy":
name = data.get("companyName", "")
elif source == "boss":
name = data.get("name", "")
json_str = json.dumps(data, ensure_ascii=False)
await client.insert(
f"job_data.{target_table}",
[[0, json_str, name, datetime.now(), datetime.now()]],
column_names=["id", "json_data", "company_name", "created_at", "updated_at"],
)
return True
async def cleanup_old_records(self):
""" 清理已完成或失败的记录 (每日调用) """
client = await clickhouse_manager.get_client()
logger.info("Starting cleanup of processed pending companies...")
# ClickHouse mutations are async, but lightweight for this purpose
query = "ALTER TABLE job_data.pending_company DELETE WHERE status IN ('done', 'failed')"
try:
await client.command(query)
logger.info("Cleanup command executed successfully.")
except Exception as e:
logger.error(f"Cleanup failed: {e}")
company_cleaner = CompanyCleaner()