import hashlib import time from typing import Dict, Any, Optional, List from enum import Enum import json from datetime import datetime import requests from clickhouse_connect.driver import AsyncClient from app.log import logger from tenacity import retry, stop_after_attempt, wait_exponential class DataType(str, Enum): """数据类型枚举""" JOB = "job" COMPANY = "company" class PlatformType(str, Enum): """平台类型枚举""" BOSS = "boss" QCWY = "qcwy" ZHILIAN = "zhilian" class DataRouterService: """通用数据路由服务 - 根据数据类型和平台自动选择对应的表进行存储""" def __init__(self, clickhouse_client: AsyncClient): self.clickhouse_client = clickhouse_client # 移除平台特定仓库引用,改用通用数据接口 # 安全获取列表数据的辅助函数 async def safe_join(self, data, default=""): """安全地将列表数据转换为逗号分隔的字符串""" if data is None: return default if isinstance(data, list): return ",".join(str(item) for item in data if item) return str(data) if data else default # 安全获取字符串数据的辅助函数 async def safe_get(self, obj, key, default=""): """安全地获取字典中的值""" value = obj.get(key) if obj else None return str(value) if value is not None else default async def store_data(self, data: Dict[str, Any], data_type: DataType, platform: PlatformType, check_duplicate: bool = True) -> Dict[str, Any]: """通用数据存储方法 - 使用JSON存储方案 Args: data: 要存储的数据 data_type: 数据类型 (job/company) platform: 平台类型 (boss/qcwy/zhilian) check_duplicate: 是否检查重复数据 Returns: 存储结果信息 """ try: return await self._store_data_as_json(data, data_type, platform, check_duplicate) except Exception as e: logger.error(f"{platform} {data_type} 数据存储失败: {str(e)}") return { "success": False, "message": f"数据存储失败: {str(e)}", "duplicate": False, "error": str(e) } def _get_json_table_name(self, data_type: DataType, platform: PlatformType) -> str: """根据数据类型和平台获取对应的JSON表名""" return f"{platform.value}_{data_type.value}" async def _store_data_as_json(self, data: Dict[str, Any], data_type: DataType, platform: PlatformType, check_duplicate: bool = True) -> Dict[str, Any]: """使用JSON存储方案存储数据""" try: # 获取对应的JSON表名 json_table_name = self._get_json_table_name(data_type, platform) remote_data = await self._prepare_remote_push_data(data, data_type, platform) if remote_data: await self.send_to_remote_server(remote_data) # QCWY平台重复检查 if platform == PlatformType.QCWY and data_type == DataType.JOB: job_id = data.get('jobId') update_date_time = data.get('updateDateTime') if job_id and update_date_time: duplicate_record = await self._check_qcwy_duplicate(json_table_name, job_id, update_date_time) if duplicate_record: logger.info(f"QCWY职位数据重复,跳过插入: jobId={job_id}, updateDateTime={update_date_time}") return { "success": True, "message": "数据重复,跳过插入", "duplicate": True, "table_name": json_table_name, "storage_type": "json" } # BOSS平台重复检查: JonId if platform == PlatformType.BOSS and data_type == DataType.JOB: job_base_info = data.get('jobBaseInfoVO', {}) job_id = job_base_info.get('jobId') if job_id: duplicate_record = await self._check_boss_duplicate(json_table_name, job_id) if duplicate_record: logger.info(f"BOSS职位数据重复,跳过插入: jobId={job_id}") return { "success": True, "message": "数据重复,跳过插入", "duplicate": True, "table_name": json_table_name, "storage_type": "json" } # 智联平台重复检查: number + firstPublishTime if platform == PlatformType.ZHILIAN and data_type == DataType.JOB: number = data.get('number') first_publish_time = data.get('firstPublishTime') if number and first_publish_time: duplicate_record = await self._check_zhilian_duplicate(json_table_name, number, first_publish_time) if duplicate_record: logger.info( f"智联职位数据重复,跳过插入: number={number}, firstPublishTime={first_publish_time}") return { "success": True, "message": "数据重复,跳过插入", "duplicate": True, "table_name": json_table_name, "storage_type": "json" } # BOSS平台公司重复检查: 按公司名称 if platform == PlatformType.BOSS and data_type == DataType.COMPANY: company_name = data.get('name') or data.get('companyFullInfoVO', {}).get('name') if company_name: duplicate_record = await self._check_boss_company_duplicate(json_table_name, company_name) if duplicate_record: logger.info(f"BOSS公司数据重复,跳过插入: companyName={company_name}") return { "success": True, "message": "数据重复,跳过插入", "duplicate": True, "table_name": json_table_name, "storage_type": "json" } # QCWY平台公司重复检查: 按公司名称 if platform == PlatformType.QCWY and data_type == DataType.COMPANY: company_name = data.get('companyName') or data.get('company_name') if company_name: duplicate_record = await self._check_qcwy_company_duplicate(json_table_name, company_name) if duplicate_record: logger.info(f"QCWY公司数据重复,跳过插入: companyName={company_name}") return { "success": True, "message": "数据重复,跳过插入", "duplicate": True, "table_name": json_table_name, "storage_type": "json" } # 智联平台公司重复检查: companyName if platform == PlatformType.ZHILIAN and data_type == DataType.COMPANY: company_name = data.get('companyName') or data.get('name') if company_name: duplicate_record = await self._check_zhilian_company_duplicate(json_table_name, company_name) if duplicate_record: logger.info(f"智联公司数据重复,跳过插入: companyName={company_name}") return { "success": True, "message": "数据重复,跳过插入", "duplicate": True, "table_name": json_table_name, "storage_type": "json" } # 准备JSON存储数据 current_time = datetime.now() json_data = { 'id': 0, # ��动生成 'json_data': json.dumps(data, ensure_ascii=False), 'created_at': current_time, 'updated_at': current_time } # 根据平台和数据类型添加去重字段 if platform == PlatformType.BOSS and data_type == DataType.JOB: # BOSS平台职位数据:添加job_id字段 job_base_info = data.get('jobBaseInfoVO', {}) if job_base_info and 'jobId' in job_base_info: json_data['job_id'] = str(job_base_info['jobId']) elif platform == PlatformType.QCWY and data_type == DataType.JOB: # QCWY平台职位数据:添加job_id和update_date_time字段 if 'jobId' in data: json_data['job_id'] = str(data['jobId']) if 'updateDateTime' in data: json_data['update_date_time'] = str(data['updateDateTime']) elif platform == PlatformType.ZHILIAN and data_type == DataType.JOB: # 智联平台职位数据:添加number和first_publish_time字段 if 'number' in data: json_data['number'] = str(data['number']) if 'firstPublishTime' in data: json_data['first_publish_time'] = str(data['firstPublishTime']) # 根据平台和数据类型添加公司去重字段 elif platform == PlatformType.BOSS and data_type == DataType.COMPANY: # BOSS平台公司数据:添加company_name字段 company_name = data.get('name') or data.get('companyFullInfoVO', {}).get('name') if company_name: json_data['company_name'] = str(company_name) elif platform == PlatformType.QCWY and data_type == DataType.COMPANY: # QCWY平台公司数据:添加company_name字段 company_name = data.get('companyName') or data.get('company_name') if company_name: json_data['company_name'] = str(company_name) elif platform == PlatformType.ZHILIAN and data_type == DataType.COMPANY: # 智联平台公司数据:添加company_name字段 company_name = data.get('companyName') or data.get('name') if company_name: json_data['company_name'] = str(company_name) # 插入到对应的JSON表 await self._insert_data_to_clickhouse(json_table_name, json_data) logger.info(f"{platform} {data_type} 数据以JSON格式存储成功到表 {json_table_name}") return { "success": True, "message": "JSON数据存储成功", "duplicate": False, "table_name": json_table_name, "storage_type": "json" } except Exception as e: logger.error(f"JSON数据存储失败: {str(e)}") raise e async def query_json_data(self, platform: PlatformType, data_type: DataType, json_fields: Optional[Dict[str, str]] = None, limit: int = 100, offset: int = 0) -> Dict[str, Any]: """查询JSON存储的数据 Args: platform: 平台类型 (必需) data_type: 数据类型 (必需) json_fields: 要提取的JSON字段映射 {alias: json_path} limit: 返回记录数限制 Returns: 查询结果 """ try: # 获取对应的JSON表名 json_table_name = self._get_json_table_name(data_type, platform) # 获取总数 count_query = f"SELECT count() FROM job_data.{json_table_name}" count_result = await self.clickhouse_client.query(count_query) total_count = count_result.result_rows[0][0] if count_result.result_rows else 0 # 构建查询 if json_fields: select_fields = ['created_at'] for alias, json_path in json_fields.items(): select_fields.append(f"JSONExtractString(json_data, '{json_path}') as {alias}") query = f"SELECT {', '.join(select_fields)} FROM job_data.{json_table_name}" else: # 如果没有指定字段,查询所有字段 query = f"SELECT * FROM job_data.{json_table_name}" query += f" ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}" # 执行查询 result = await self.clickhouse_client.query(query) # 将结果转换为字典列表 data = [] for row in result.result_rows: item = dict(zip(result.column_names, row)) # 尝试解析json_data if 'json_data' in item and isinstance(item['json_data'], str): try: json_content = json.loads(item['json_data']) if isinstance(json_content, dict): item.update(json_content) except: pass data.append(item) logger.info(f"JSON数据查询成功,从表 {json_table_name} 返回 {len(result.result_rows)} 条记录") return { "success": True, "data": data, "columns": result.column_names, "count": total_count, "table_name": json_table_name } except Exception as e: logger.error(f"JSON数据查询失败: {str(e)}") return { "success": False, "message": f"查询失败: {str(e)}", "error": str(e) } async def _insert_data_to_clickhouse(self, table_name: str, data: Dict[str, Any]) -> None: """向ClickHouse表插入数据 Args: table_name: 表名 data: 要插入的数据字典 """ try: columns = list(data.keys()) values = [[data.get(col) for col in columns]] await self.clickhouse_client.insert(f"job_data.{table_name}", values, column_names=columns) except Exception as e: logger.error(f"向表 {table_name} 插入数据失败: {str(e)}") raise e @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _check_qcwy_duplicate(self, table_name: str, job_id: str, update_date_time: str) -> Optional[ Dict[str, Any]]: """检查QCWY平台重复数据 - 基于job_id和update_date_time字段""" try: query = f""" SELECT id, created_at FROM job_data.{table_name} WHERE job_id = {job_id:String} AND update_date_time = {udt:String} LIMIT 1 """ result = await self.clickhouse_client.query(query, parameters={"job_id": str(job_id), "udt": str(update_date_time)}) if result.result_rows: logger.info(f"发现QCWY重复数据: jobId={job_id}, updateDateTime={update_date_time}") return { "id": result.result_rows[0][0], "created_at": result.result_rows[0][1] } return None except Exception as e: logger.error(f"检查QCWY重复数据失败: {str(e)}") return None @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _check_zhilian_duplicate(self, table_name: str, number: str, first_publish_time: str) -> Optional[ Dict[str, Any]]: """检查智联平台重复数据 - 基于number和first_publish_time字段""" try: query = f""" SELECT id, created_at FROM job_data.{table_name} WHERE number = {number:String} AND first_publish_time = {fpt:String} LIMIT 1 """ result = await self.clickhouse_client.query(query, parameters={"number": str(number), "fpt": str(first_publish_time)}) if result.result_rows: logger.info(f"发现智联重复数据: number={number}, firstPublishTime={first_publish_time}") return { "id": result.result_rows[0][0], "created_at": result.result_rows[0][1], "number": number, "first_publish_time": first_publish_time } return None except Exception as e: logger.error(f"检查智联重复数据失败: {str(e)}") return None @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _check_boss_duplicate(self, table_name: str, job_id: any) -> Optional[Dict[str, Any]]: """检查BOSS平台重复数据 - 基于job_id字段""" try: query = f""" SELECT id, created_at FROM job_data.{table_name} WHERE job_id = {job_id:String} LIMIT 1 """ result = await self.clickhouse_client.query(query, parameters={"job_id": str(job_id)}) if result.result_rows: logger.info(f"发现BOSS重复数据: jobId={job_id}") return { "id": result.result_rows[0][0], "created_at": result.result_rows[0][1] } return None except Exception as e: logger.error(f"检查BOSS重复数据失败: {str(e)}") return None @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _check_boss_company_duplicate(self, table_name: str, company_name: str) -> Optional[Dict[str, Any]]: """检查BOSS平台公司重复数据 - 基于company_name字段""" try: query = f""" SELECT id, created_at FROM job_data.{table_name} WHERE company_name = {company_name:String} LIMIT 1 """ result = await self.clickhouse_client.query(query, parameters={"company_name": str(company_name)}) if result.result_rows: logger.info(f"发现BOSS公司重复数据: companyName={company_name}") return { "id": result.result_rows[0][0], "created_at": result.result_rows[0][1] } return None except Exception as e: logger.error(f"检查BOSS公司重复数据失败: {str(e)}") return None @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _check_qcwy_company_duplicate(self, table_name: str, company_name: str) -> Optional[Dict[str, Any]]: """检查QCWY平台公司重复数据 - 基于company_name字段""" try: query = f""" SELECT id, created_at FROM job_data.{table_name} WHERE company_name = {company_name:String} LIMIT 1 """ result = await self.clickhouse_client.query(query, parameters={"company_name": str(company_name)}) if result.result_rows: logger.info(f"发现QCWY公司重复数据: companyName={company_name}") return { "id": result.result_rows[0][0], "created_at": result.result_rows[0][1] } return None except Exception as e: logger.error(f"检查QCWY公司重复数据失败: {str(e)}") return None @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _check_qcwy_company_duplicate_by_name(self, table_name: str, company_name: str) -> Optional[ Dict[str, Any]]: """检查QCWY平台公司重复数据 - 基于company_name字段""" try: query = f""" SELECT id, created_at FROM job_data.{table_name} WHERE company_name = {company_name:String} LIMIT 1 """ result = await self.clickhouse_client.query(query, parameters={"company_name": str(company_name)}) if result.result_rows: logger.info(f"发现QCWY公司重复数据: companyName={company_name}") return { "id": result.result_rows[0][0], "created_at": result.result_rows[0][1] } return None except Exception as e: logger.error(f"检查QCWY公司重复数据失败: {str(e)}") return None @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def _check_zhilian_company_duplicate(self, table_name: str, company_name: str) -> Optional[Dict[str, Any]]: """检查智联平台公司重复数据 - 基于company_name字段""" try: query = f""" SELECT id, created_at FROM job_data.{table_name} WHERE company_name = {company_name:String} LIMIT 1 """ result = await self.clickhouse_client.query(query, parameters={"company_name": str(company_name)}) if result.result_rows: logger.info(f"发现智联公司重复数据: companyName={company_name}") return { "id": result.result_rows[0][0], "created_at": result.result_rows[0][1] } return None except Exception as e: logger.error(f"检查智联公司重复数据失败: {str(e)}") return None async def send_to_remote_server(self, data: Dict[str, Any]) -> bool: """ 发送数据到远程服务器(简化版) 直接接收body数据并发送 Args: data: 要发送的数据字典 Returns: bool: 发送成功返回True,失败返回False """ # 打印关键词日志 source_type = data.get('source_type', '未知平台') title = data.get('title', '未知职位') company_name = data.get('company_name', data.get('name', '未知公司')) logger.info(f"📤 上报数据: [{source_type}] {title} - {company_name}") try: # 构建认证参数 from_id = 9910056 timestamp = int(time.time()) salt = 'jWcIqJK6QlR2syb6HQgpel9iOoOkj01G5MDFNtQLaTxhddHUTEnURsMe2RxCTYC8' # 生成token token_string = salt + str(timestamp) token = hashlib.md5(token_string.encode()).hexdigest() url = f'http://external-data.qixin.com/extend/extend_data_push?from={from_id}&token={token}&time={timestamp}' headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36' } # 直接发送原始数据 response = requests.post(url, json=data, headers=headers, timeout=30) # print(response.text) if response.status_code == 200: return True else: logger.error(f"❌ 数据发送失败: {response.status_code} - {response.text[:100]}") return False except Exception as e: logger.error(f"❌ 发送异常: {str(e)}") return False async def batch_store_data(self, data_list: List[Dict[str, Any]], data_type: DataType, platform: PlatformType, check_duplicate: bool = True) -> Dict[str, Any]: """批量存储数据 - 优化版本,使用批量插入 Args: data_list: 要存储的数据列表 data_type: 数据类型 (job/company) platform: 平台类型 (boss/qcwy/zhilian) check_duplicate: 是否检查重复数据 Returns: 批量存储结果信息 """ results = { "total": len(data_list), "success": 0, "failed": 0, "duplicate": 0, "errors": [] } if not data_list: return results try: # 获取表名 json_table_name = self._get_json_table_name(data_type, platform) # 批量处理数据 - 直接准备插入数据,在插入时处理重复 valid_data_list = [] remote_push_data_list = [] # 第一步:准备所有数据 for i, data in enumerate(data_list): try: # 准备插入数据 current_time = datetime.now() json_data = { 'id': 0, # 自动生成 'json_data': json.dumps(data, ensure_ascii=False), 'created_at': current_time, 'updated_at': current_time } # 添加去重字段 self._add_dedup_fields(json_data, data, data_type, platform) valid_data_list.append(json_data) # 准备远程推送数据 remote_data = await self._prepare_remote_push_data(data, data_type, platform) if remote_data: remote_push_data_list.append(remote_data) except Exception as e: results["failed"] += 1 results["errors"].append({ "index": i, "error": f"数据预处理失败: {str(e)}" }) # 第二步:批量插入到数据库(在插入时忽略重复数据) if valid_data_list: try: insert_result = await self._batch_insert_to_clickhouse(json_table_name, valid_data_list, ignore_duplicates=check_duplicate) results["success"] = insert_result["inserted"] results["duplicate"] = insert_result["ignored"] # logger.info( # f"批量插入完成: {insert_result['inserted']} 条成功, {insert_result['ignored']} 条重复忽略") except Exception as e: # 如果批量插入完全失败,记录错误 logger.error(f"批量插入失败: {str(e)}") results["failed"] = len(valid_data_list) results["errors"].append({ "error": f"批量插入失败: {str(e)}" }) # 第三步:批量推送到远程服务器 if remote_push_data_list: try: await self._batch_send_to_remote_server(remote_push_data_list) logger.info(f"批量推送到远程服务器成功: {len(remote_push_data_list)} 条数据") except Exception as e: logger.warning(f"批量推送到远程服务器失败: {str(e)}") # 远程推送失败不影响主要存储结果 except Exception as e: logger.error(f"批量存储数据失败: {str(e)}") # 如果批量处理完全失败,回退到原来的逐个处理方式 return await self._fallback_individual_store(data_list, data_type, platform, check_duplicate) return results def _add_dedup_fields(self, json_data: Dict[str, Any], data: Dict[str, Any], data_type: DataType, platform: PlatformType): """为JSON数据添加去重字段""" if platform == PlatformType.BOSS and data_type == DataType.JOB: job_base_info = data.get('jobBaseInfoVO', {}) if job_base_info and 'jobId' in job_base_info: json_data['job_id'] = str(job_base_info['jobId']) elif platform == PlatformType.QCWY and data_type == DataType.JOB: if 'jobId' in data: json_data['job_id'] = str(data['jobId']) if 'updateDateTime' in data: json_data['update_date_time'] = str(data['updateDateTime']) elif platform == PlatformType.ZHILIAN and data_type == DataType.JOB: if 'number' in data: json_data['number'] = str(data['number']) if 'firstPublishTime' in data: json_data['first_publish_time'] = str(data['firstPublishTime']) elif platform == PlatformType.BOSS and data_type == DataType.COMPANY: company_name = data.get('name') or data.get('companyFullInfoVO', {}).get('name') if company_name: json_data['company_name'] = str(company_name) elif platform == PlatformType.QCWY and data_type == DataType.COMPANY: company_name = data.get('companyName') or data.get('company_name') if company_name: json_data['company_name'] = str(company_name) elif platform == PlatformType.ZHILIAN and data_type == DataType.COMPANY: company_name = data.get('companyName') or data.get('name') if company_name: json_data['company_name'] = str(company_name) async def _prepare_remote_push_data(self, data: Dict[str, Any], data_type: DataType, platform: PlatformType) -> \ Optional[Dict[str, Any]]: """准备远程推送数据""" if data_type != DataType.JOB: return None try: if platform == PlatformType.QCWY: welfare_list = data.get("jobWelfareCodeDataList") if isinstance(welfare_list, list): welfare_str = ",".join( str(item.get("chineseTitle") or item.get("typeTitle") or item.get("englishTitle") or item.get("code")) for item in welfare_list if isinstance(item, dict) ) elif isinstance(welfare_list, str): welfare_str = welfare_list.replace("[", "").replace("]", "") else: welfare_str = "" raw_location = data.get("location") or "" if not raw_location: work_loc = data.get("workLocation") or {} raw_location = work_loc.get("workAddress") or work_loc.get("address") or "" if raw_location: location_val = raw_location else: location_val = "位置信息未找到" raw_area = data.get("jobAreaString") or "" if not raw_area: level_detail = data.get("jobAreaLevelDetail") or {} city_str = level_detail.get("cityString") or "" landmark_str = level_detail.get("landMarkString") or "" raw_area = f"{city_str}{landmark_str}".strip() if raw_area: area_val = raw_area else: area_val = "位置信息未找到" remote_resp = { 'source_type': '前程无忧', 'name': data.get("companyName"), 'title': data.get("jobName"), 'title_addr': data.get("jobName"), 'description': data.get("jobDescribe"), 'age': "", 'sex': "", 'number': "", 'education': data.get("degreeString"), 'skill': await self.safe_join(data.get("jobTagsForOrder")), 'welfare': welfare_str, 'years': data.get("workYearString"), 'salary': f'{data.get("jobSalaryMax", "")}-{data.get("jobSalaryMin", "")}', 'location': location_val, 'position': area_val, 'date': data.get("confirmDateString"), 'start_date': data.get("confirmDateString"), 'end_date': "", 'job_type': data.get("termStr"), 'size': data.get("companySizeString"), 'employer_type': data.get("companyTypeString"), 'industry': f'{data.get("major1Str", "")}-{data.get("major2Str", "")}', 'job_1st_class': "", 'job_2nd_class': "", 'job_3rd_class': "", 'job_4th_class': "", 'url': data.get("jobHref"), 'company_id': data.get("coId"), 'company_name': data.get("fullCompanyName"), 'company_url': data.get("companyHref"), 'company_desc': data.get("company_desc", ""), 'base_data':data } return remote_resp elif platform == PlatformType.BOSS: bossBaseInfoVO = data.get("bossBaseInfoVO", {}) jobBaseInfoVO = data.get("jobBaseInfoVO", {}) brandComInfoVO = data.get("brandComInfoVO", {}) boss_resp = { 'source_type': 'Boss直聘', 'name': await self.safe_get(brandComInfoVO, "brandName"), 'common_name': await self.safe_get(bossBaseInfoVO, "brandName"), 'title': await self.safe_get(jobBaseInfoVO, "positionName"), 'title_addr': await self.safe_get(jobBaseInfoVO, "positionName"), 'description': await self.safe_get(jobBaseInfoVO, "jobDesc"), 'education': await self.safe_get(jobBaseInfoVO, "degreeName"), 'skill': await self.safe_join(jobBaseInfoVO.get("requiredSkills") if jobBaseInfoVO else None), 'welfare': await self.safe_join(jobBaseInfoVO.get("salaryWelfareInfo") if jobBaseInfoVO else None), 'years': await self.safe_get(jobBaseInfoVO, "experienceName"), 'salary': f'{await self.safe_get(jobBaseInfoVO, "lowSalary")}-{await self.safe_get(jobBaseInfoVO, "highSalary")}', 'location': await self.safe_get(jobBaseInfoVO, "locationName", "位置信息未找到"), 'position': await self.safe_get(jobBaseInfoVO, "locationDesc", "位置信息未找到"), 'job_type': "全职", 'size': await self.safe_get(brandComInfoVO, "scaleName"), 'employer_type': "全职", 'industry': await self.safe_get(brandComInfoVO, "industryName"), 'job_1st_class': "", 'job_2nd_class': "", 'job_3rd_class': "", 'job_4th_class': "", 'date': "", 'start_date': "", 'end_date': "", 'age': "", 'sex': "", 'number': "", 'url': f"https://www.zhipin.com/job_detail/{await self.safe_get(jobBaseInfoVO, 'encryptJobId')}.html", 'company_id': await self.safe_get(brandComInfoVO, "encryptBrandId"), 'company_name': await self.safe_get(brandComInfoVO, "brandName"), 'company_url': f"https://www.zhipin.com/gongsi/{await self.safe_get(brandComInfoVO, 'encryptBrandId')}.html", 'company_desc': await self.safe_get(brandComInfoVO, "introduce"), 'base_data': data } return boss_resp elif platform == PlatformType.ZHILIAN: # 智联平台:从原始 data 中提取所需字段 zhilian_resp = { 'source_type': '智联招聘', 'name': await self.safe_get(data, 'companyName'), 'common_name': await self.safe_get(data, 'companyName'), 'title': await self.safe_get(data, 'name'), 'title_addr': await self.safe_get(data, 'name'), 'description': await self.safe_get(data, 'jobSummary'), 'education': await self.safe_get(data, 'education'), 'skill': await self.safe_join([tag['value'] for tag in data.get('skillLabel', [])]), 'welfare': '', # 智联该条数据无福利字段 'years': await self.safe_get(data, 'workingExp'), 'salary': await self.safe_get(data, 'salary60'), 'location': f"{await self.safe_get(data, 'workCity')}{await self.safe_get(data, 'cityDistrict')}", 'position': f"{await self.safe_get(data, 'workCity')}{await self.safe_get(data, 'cityDistrict')}", 'job_type': await self.safe_get(data, 'workType'), 'size': await self.safe_get(data, 'companySize'), 'employer_type': await self.safe_get(data, 'propertyName'), 'industry': await self.safe_get(data, 'industryName'), 'job_1st_class': '', 'job_2nd_class': '', 'job_3rd_class': '', 'job_4th_class': '', 'date': await self.safe_get(data, 'firstPublishTime'), 'start_date': '', 'end_date': '', 'age': '', 'sex': '', 'number': str(await self.safe_get(data, 'recruitNumber')), 'url': await self.safe_get(data, 'positionURL'), 'company_id': str(await self.safe_get(data, 'companyId')), 'company_name': await self.safe_get(data, 'companyName'), 'company_url': await self.safe_get(data, 'companyUrl'), 'company_desc': await self.safe_get(data, 'companyDesc'), 'base_data': data } return zhilian_resp except Exception as e: logger.error(f"准备远程推送数据失败: {str(e)}") return None async def _batch_insert_to_clickhouse(self, table_name: str, data_list: List[Dict[str, Any]], ignore_duplicates: bool = True) -> Dict[str, int]: """批量插入数据到ClickHouse,支持忽略重复数据 Args: table_name: 表名 data_list: 数据列表 ignore_duplicates: 是否忽略重复数据 Returns: 插入结果统计 {"inserted": 插入数量, "ignored": 忽略数量} """ result = {"inserted": 0, "ignored": 0} if not data_list: return result try: columns = list(data_list[0].keys()) filtered_list = data_list if ignore_duplicates: dedup_cols = self._get_dedup_columns_for_table(table_name) if dedup_cols: if len(dedup_cols) == 1: key_col = dedup_cols[0] candidate_keys = list({str(d.get(key_col, "")) for d in data_list if d.get(key_col)}) if candidate_keys: query = f""" SELECT {key_col} FROM job_data.{table_name} WHERE {key_col} IN {{keys:Array(String)}} """ existing = await self.clickhouse_client.query(query, parameters={"keys": candidate_keys}) existing_set = {str(r[0]) for r in existing.result_rows} filtered_list = [d for d in data_list if str(d.get(key_col, "")) not in existing_set] elif len(dedup_cols) == 2: c1, c2 = dedup_cols candidate_c1 = list({str(d.get(c1, "")) for d in data_list if d.get(c1)}) if candidate_c1: query = f""" SELECT {c1}, {c2} FROM job_data.{table_name} WHERE {c1} IN {{keys:Array(String)}} """ existing = await self.clickhouse_client.query(query, parameters={"keys": candidate_c1}) existing_map = {} for r in existing.result_rows: k = str(r[0]) v = str(r[1]) existing_map.setdefault(k, set()).add(v) filtered_list = [ d for d in data_list if str(d.get(c1, "")) not in existing_map or str(d.get(c2, "")) not in existing_map.get(str(d.get(c1, "")), set()) ] batch_values = [[item.get(col) for col in columns] for item in filtered_list] if batch_values: await self.clickhouse_client.insert(f"job_data.{table_name}", batch_values, column_names=columns) result["inserted"] = len(batch_values) result["ignored"] = len(data_list) - result["inserted"] except Exception as e: logger.error(f"批量插入到表 {table_name} 失败: {str(e)}") raise e return result def _get_dedup_columns_for_table(self, table_name: str) -> List[str]: """获取表的去重列""" if table_name == "boss_job": return ["job_id"] if table_name == "qcwy_job": return ["job_id", "update_date_time"] if table_name == "zhilian_job": return ["number", "first_publish_time"] if table_name in ("boss_company", "qcwy_company", "zhilian_company"): return ["company_name"] return [] async def _batch_send_to_remote_server(self, data_list: List[Dict[str, Any]]) -> None: """批量发送数据到远程服务器""" for data in data_list: try: await self.send_to_remote_server(data) except Exception as e: logger.error(f"批量推送单条数据失败: {str(e)}") # 继续处理下一条数据 async def _fallback_individual_store(self, data_list: List[Dict[str, Any]], data_type: DataType, platform: PlatformType, check_duplicate: bool) -> Dict[str, Any]: """回退到逐个存储的方法""" results = { "total": len(data_list), "success": 0, "failed": 0, "duplicate": 0, "errors": [] } for i, data in enumerate(data_list): try: result = await self.store_data(data, data_type, platform, check_duplicate) if result["success"]: results["success"] += 1 elif result.get("duplicate"): results["duplicate"] += 1 else: results["failed"] += 1 results["errors"].append({ "index": i, "error": result.get("message", "未知错误") }) except Exception as e: results["failed"] += 1 results["errors"].append({ "index": i, "error": str(e) }) return results # 创建全局实例的工厂函数 def create_data_router_service(clickhouse_client: AsyncClient) -> DataRouterService: return DataRouterService(clickhouse_client)