JobData/app/api/v1/ingest/ingest.py

46 lines
1.5 KiB
Python

from typing import Optional, List, Dict, Any
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from clickhouse_connect.driver import AsyncClient
from app.core.clickhouse import clickhouse_manager
from app.services.ingest_service import IngestService
router = APIRouter()
class IngestSingleRequest(BaseModel):
platform: str = Field(...)
data_type: str = Field(...)
data: Dict[str, Any] = Field(...)
check_duplicate: bool = Field(True)
class IngestBatchRequest(BaseModel):
platform: str = Field(...)
data_type: str = Field(...)
data_list: List[Dict[str, Any]] = Field(...)
check_duplicate: bool = Field(True)
async def get_service() -> IngestService:
client: AsyncClient = await clickhouse_manager.get_client()
return IngestService(client)
@router.post("/data")
async def ingest_data(req: IngestSingleRequest, service: IngestService = Depends(get_service)):
try:
res = await service.store_single(req.platform, req.data_type, req.data, req.check_duplicate)
return {"code": 200, "data": res, "message": "ok"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/batch")
async def ingest_batch(req: IngestBatchRequest, service: IngestService = Depends(get_service)):
try:
res = await service.store_batch(req.platform, req.data_type, req.data_list, req.check_duplicate)
return {"code": 200, "data": res, "message": "ok"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))