from typing import Dict, Any, List, Optional from fastapi import HTTPException, BackgroundTasks from app.services.job import DataRouterService, DataType, PlatformType from app.log import logger from pydantic import BaseModel, Field class UniversalDataRequest(BaseModel): """通用数据存储请求模型""" data: Dict[str, Any] = Field(..., description="要存储的数据") data_type: DataType = Field(..., description="数据类型 (job/company)") platform: PlatformType = Field(..., description="平台类型 (boss/qcwy/zhilian)") check_duplicate: bool = Field(True, description="是否检查重复数据") class BatchDataRequest(BaseModel): """批量数据存储请求模型""" data_list: List[Dict[str, Any]] = Field(..., description="要存储的数据列表") data_type: DataType = Field(..., description="数据类型 (job/company)") platform: PlatformType = Field(..., description="平台类型 (boss/qcwy/zhilian)") check_duplicate: bool = Field(True, description="是否检查重复数据") class UniversalDataController: """通用数据控制器 - 处理所有平台的数据存储请求""" def __init__(self, data_router_service: DataRouterService): self.data_router_service = data_router_service async def store_single_data(self, request: UniversalDataRequest) -> Dict[str, Any]: """存储单条数据""" try: # logger.info(f"接收到 {request.platform} {request.data_type} 数据存储请求") result = await self.data_router_service.store_data( data=request.data, data_type=request.data_type, platform=request.platform, check_duplicate=request.check_duplicate ) return { "code": 200 if result["success"] else 400, "message": result["message"], "data": result, "platform": request.platform, "data_type": request.data_type } except Exception as e: logger.error(f"存储单条数据失败: {str(e)}") raise HTTPException(status_code=500, detail=f"数据存储失败: {str(e)}") async def store_batch_data(self, request: BatchDataRequest) -> Dict[str, Any]: """批量存储数据""" try: # logger.info( # f"接收到 {request.platform} {request.data_type} 批量数据存储请求,共 {len(request.data_list)} 条") result = await self.data_router_service.batch_store_data( data_list=request.data_list, data_type=request.data_type, platform=request.platform, check_duplicate=request.check_duplicate ) return { "code": 200, "message": f"批量处理完成: 成功 {result['success']} 条,失败 {result['failed']} 条,重复 {result['duplicate']} 条", "data": result, "platform": request.platform, "data_type": request.data_type } except Exception as e: logger.error(f"批量存储数据失败: {str(e)}") raise HTTPException(status_code=500, detail=f"批量数据存储失败: {str(e)}") async def store_single_data_async(self, background_tasks: BackgroundTasks, request: UniversalDataRequest) -> Dict[str, Any]: """异步存储单条数据""" try: # logger.info(f"接收到 {request.platform} {request.data_type} 异步数据存储请求") # 添加后台任务 background_tasks.add_task( self._async_store_single_data, request ) return { "code": 202, "message": "数据已加入异步处理队列", "platform": request.platform, "data_type": request.data_type } except Exception as e: logger.error(f"异步存储单条数据失败: {str(e)}") raise HTTPException(status_code=500, detail=f"异步数据存储失败: {str(e)}") async def store_batch_data_async(self, background_tasks: BackgroundTasks, request: BatchDataRequest) -> Dict[str, Any]: """异步批量存储数据""" try: # 打印接收日志 platform_name = {"boss": "Boss直聘", "qcwy": "前程无忧", "zhilian": "智联招聘"}.get(request.platform.value, request.platform.value) logger.info(f"📥 收到批量请求: [{platform_name}] {request.data_type.value} x{len(request.data_list)} 条") # 添加后台任务 background_tasks.add_task( self._async_store_batch_data, request ) return { "code": 202, "message": f"批量数据已加入异步处理队列,共 {len(request.data_list)} 条", "platform": request.platform, "data_type": request.data_type } except Exception as e: logger.error(f"异步批量存储数据失败: {str(e)}") raise HTTPException(status_code=500, detail=f"异步批量数据存储失败: {str(e)}") async def _async_store_single_data(self, request: UniversalDataRequest): """异步存储单条数据的后台任务""" try: result = await self.data_router_service.store_data( data=request.data, data_type=request.data_type, platform=request.platform, check_duplicate=request.check_duplicate ) if result["success"]: logger.info(f"异步存储 {request.platform} {request.data_type} 数据成功") else: logger.warning(f"异步存储 {request.platform} {request.data_type} 数据失败: {result['message']}") except Exception as e: logger.error(f"异步存储单条数据后台任务失败: {str(e)}") async def _async_store_batch_data(self, request: BatchDataRequest): """异步批量存储数据的后台任务""" try: platform_name = {"boss": "Boss直聘", "qcwy": "前程无忧", "zhilian": "智联招聘"}.get(request.platform.value, request.platform.value) result = await self.data_router_service.batch_store_data( data_list=request.data_list, data_type=request.data_type, platform=request.platform, check_duplicate=request.check_duplicate ) logger.info(f"✅ 批量处理完成: [{platform_name}] 成功 {result['success']} 条, 重复 {result['duplicate']} 条, 失败 {result['failed']} 条") except Exception as e: logger.error(f"异步批量存储数据后台任务失败: {str(e)}") async def query_data(self, platform: PlatformType, data_type: DataType, page: int = 1, page_size: int = 20) -> Dict[str, Any]: """查询数据""" try: logger.info(f"查询 {platform} {data_type} 数据,页码: {page}, 页大小: {page_size}") offset = (page - 1) * page_size result = await self.data_router_service.query_json_data( platform=platform, data_type=data_type, limit=page_size, offset=offset ) return { "code": 200, "message": "查询数据成功", "data": { "items": result.get("data", []), "total": result.get("count", 0), "page": page, "page_size": page_size }, "platform": platform, "data_type": data_type } except Exception as e: logger.error(f"查询数据失败: {str(e)}") raise HTTPException(status_code=500, detail=f"查询数据失败: {str(e)}") async def get_supported_platforms(self) -> Dict[str, Any]: """获取支持的平台和数据类型""" return { "code": 200, "message": "获取支持的平台和数据类型成功", "data": { "platforms": [platform.value for platform in PlatformType], "data_types": [data_type.value for data_type in DataType], "platform_duplicate_keys": { "boss": { "job": "job_id", "company": "company_name" }, "qcwy": { "job": "job_id + update_date_time", "company": "company_name" }, "zhilian": { "job": "number + first_publish_time", "company": "company_name" } } } } # 创建控制器实例的工厂函数 def create_universal_data_controller(data_router_service: DataRouterService) -> UniversalDataController: return UniversalDataController(data_router_service)