225 lines
9.2 KiB
Python

from typing import Dict, Any, List, Optional
from fastapi import HTTPException, BackgroundTasks
from app.services.job import DataRouterService, DataType, PlatformType
from app.log import logger
from pydantic import BaseModel, Field
class UniversalDataRequest(BaseModel):
"""通用数据存储请求模型"""
data: Dict[str, Any] = Field(..., description="要存储的数据")
data_type: DataType = Field(..., description="数据类型 (job/company)")
platform: PlatformType = Field(..., description="平台类型 (boss/qcwy/zhilian)")
check_duplicate: bool = Field(True, description="是否检查重复数据")
class BatchDataRequest(BaseModel):
"""批量数据存储请求模型"""
data_list: List[Dict[str, Any]] = Field(..., description="要存储的数据列表")
data_type: DataType = Field(..., description="数据类型 (job/company)")
platform: PlatformType = Field(..., description="平台类型 (boss/qcwy/zhilian)")
check_duplicate: bool = Field(True, description="是否检查重复数据")
class UniversalDataController:
"""通用数据控制器 - 处理所有平台的数据存储请求"""
def __init__(self, data_router_service: DataRouterService):
self.data_router_service = data_router_service
async def store_single_data(self, request: UniversalDataRequest) -> Dict[str, Any]:
"""存储单条数据"""
try:
# logger.info(f"接收到 {request.platform} {request.data_type} 数据存储请求")
result = await self.data_router_service.store_data(
data=request.data,
data_type=request.data_type,
platform=request.platform,
check_duplicate=request.check_duplicate
)
return {
"code": 200 if result["success"] else 400,
"message": result["message"],
"data": result,
"platform": request.platform,
"data_type": request.data_type
}
except Exception as e:
logger.error(f"存储单条数据失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"数据存储失败: {str(e)}")
async def store_batch_data(self, request: BatchDataRequest) -> Dict[str, Any]:
"""批量存储数据"""
try:
# logger.info(
# f"接收到 {request.platform} {request.data_type} 批量数据存储请求,共 {len(request.data_list)} 条")
result = await self.data_router_service.batch_store_data(
data_list=request.data_list,
data_type=request.data_type,
platform=request.platform,
check_duplicate=request.check_duplicate
)
return {
"code": 200,
"message": f"批量处理完成: 成功 {result['success']} 条,失败 {result['failed']} 条,重复 {result['duplicate']}",
"data": result,
"platform": request.platform,
"data_type": request.data_type
}
except Exception as e:
logger.error(f"批量存储数据失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"批量数据存储失败: {str(e)}")
async def store_single_data_async(self,
background_tasks: BackgroundTasks,
request: UniversalDataRequest) -> Dict[str, Any]:
"""异步存储单条数据"""
try:
# logger.info(f"接收到 {request.platform} {request.data_type} 异步数据存储请求")
# 添加后台任务
background_tasks.add_task(
self._async_store_single_data,
request
)
return {
"code": 202,
"message": "数据已加入异步处理队列",
"platform": request.platform,
"data_type": request.data_type
}
except Exception as e:
logger.error(f"异步存储单条数据失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"异步数据存储失败: {str(e)}")
async def store_batch_data_async(self,
background_tasks: BackgroundTasks,
request: BatchDataRequest) -> Dict[str, Any]:
"""异步批量存储数据"""
try:
# 打印接收日志
platform_name = {"boss": "Boss直聘", "qcwy": "前程无忧", "zhilian": "智联招聘"}.get(request.platform.value, request.platform.value)
logger.info(f"📥 收到批量请求: [{platform_name}] {request.data_type.value} x{len(request.data_list)}")
# 添加后台任务
background_tasks.add_task(
self._async_store_batch_data,
request
)
return {
"code": 202,
"message": f"批量数据已加入异步处理队列,共 {len(request.data_list)}",
"platform": request.platform,
"data_type": request.data_type
}
except Exception as e:
logger.error(f"异步批量存储数据失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"异步批量数据存储失败: {str(e)}")
async def _async_store_single_data(self, request: UniversalDataRequest):
"""异步存储单条数据的后台任务"""
try:
result = await self.data_router_service.store_data(
data=request.data,
data_type=request.data_type,
platform=request.platform,
check_duplicate=request.check_duplicate
)
if result["success"]:
logger.info(f"异步存储 {request.platform} {request.data_type} 数据成功")
else:
logger.warning(f"异步存储 {request.platform} {request.data_type} 数据失败: {result['message']}")
except Exception as e:
logger.error(f"异步存储单条数据后台任务失败: {str(e)}")
async def _async_store_batch_data(self, request: BatchDataRequest):
"""异步批量存储数据的后台任务"""
try:
platform_name = {"boss": "Boss直聘", "qcwy": "前程无忧", "zhilian": "智联招聘"}.get(request.platform.value, request.platform.value)
result = await self.data_router_service.batch_store_data(
data_list=request.data_list,
data_type=request.data_type,
platform=request.platform,
check_duplicate=request.check_duplicate
)
logger.info(f"✅ 批量处理完成: [{platform_name}] 成功 {result['success']} 条, 重复 {result['duplicate']} 条, 失败 {result['failed']}")
except Exception as e:
logger.error(f"异步批量存储数据后台任务失败: {str(e)}")
async def query_data(self, platform: PlatformType, data_type: DataType,
page: int = 1, page_size: int = 20) -> Dict[str, Any]:
"""查询数据"""
try:
logger.info(f"查询 {platform} {data_type} 数据,页码: {page}, 页大小: {page_size}")
offset = (page - 1) * page_size
result = await self.data_router_service.query_json_data(
platform=platform,
data_type=data_type,
limit=page_size,
offset=offset
)
return {
"code": 200,
"message": "查询数据成功",
"data": {
"items": result.get("data", []),
"total": result.get("count", 0),
"page": page,
"page_size": page_size
},
"platform": platform,
"data_type": data_type
}
except Exception as e:
logger.error(f"查询数据失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"查询数据失败: {str(e)}")
async def get_supported_platforms(self) -> Dict[str, Any]:
"""获取支持的平台和数据类型"""
return {
"code": 200,
"message": "获取支持的平台和数据类型成功",
"data": {
"platforms": [platform.value for platform in PlatformType],
"data_types": [data_type.value for data_type in DataType],
"platform_duplicate_keys": {
"boss": {
"job": "job_id",
"company": "company_name"
},
"qcwy": {
"job": "job_id + update_date_time",
"company": "company_name"
},
"zhilian": {
"job": "number + first_publish_time",
"company": "company_name"
}
}
}
}
# 创建控制器实例的工厂函数
def create_universal_data_controller(data_router_service: DataRouterService) -> UniversalDataController:
return UniversalDataController(data_router_service)