JobData/app/api/v1/analytics.py

93 lines
3.1 KiB
Python

from typing import Optional, List
from datetime import datetime, date, timezone
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
from fastapi import APIRouter, Depends, Query
from app.core.clickhouse import clickhouse_manager
from app.services.analytics_service import AnalyticsService
from app.schemas.analytics import (
JobStatisticsResponse,
)
router = APIRouter()
CHINA_TZ = ZoneInfo("Asia/Shanghai")
async def get_analytics_service() -> AnalyticsService:
client = await clickhouse_manager.get_client()
return AnalyticsService(client)
def to_utc(dt: datetime) -> datetime:
"""将本地(上海)时间转换为UTC"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=CHINA_TZ)
return dt.astimezone(timezone.utc)
@router.get("/overview", response_model=JobStatisticsResponse, summary="获取职位统计总览")
async def get_overview(
from_date: Optional[date] = None,
to_date: Optional[date] = None,
city: Optional[str] = None,
service: AnalyticsService = Depends(get_analytics_service)
):
from_dt = to_utc(datetime.combine(from_date, datetime.min.time())) if from_date else None
to_dt = to_utc(datetime.combine(to_date, datetime.max.time())) if to_date else None
filters = {}
if city:
filters["city"] = city
return await service.get_job_statistics(filters=filters, from_dt=from_dt, to_dt=to_dt)
@router.get("/trend/volume", summary="获取数据量趋势")
async def get_volume_trend(
interval: str = Query("day", regex="^(day|hour|week|month)$"),
from_date: Optional[date] = None,
to_date: Optional[date] = None,
from_datetime: Optional[datetime] = None,
to_datetime: Optional[datetime] = None,
service: AnalyticsService = Depends(get_analytics_service)
):
# 兼容小时粒度的精确时间窗口,其它粒度按日期转换为起止时间
if from_datetime:
from_dt = to_utc(from_datetime)
elif from_date:
from_dt = to_utc(datetime.combine(from_date, datetime.min.time()))
else:
from_dt = None
if to_datetime:
to_dt = to_utc(to_datetime)
elif to_date:
to_dt = to_utc(datetime.combine(to_date, datetime.max.time()))
else:
to_dt = None
return await service.get_volume_trend(interval=interval, from_dt=from_dt, to_dt=to_dt)
@router.get("/distribution/source", summary="获取数据来源分布")
async def get_source_distribution(
from_date: Optional[date] = None,
to_date: Optional[date] = None,
from_datetime: Optional[datetime] = None,
to_datetime: Optional[datetime] = None,
service: AnalyticsService = Depends(get_analytics_service)
):
if from_datetime:
from_dt = to_utc(from_datetime)
elif from_date:
from_dt = to_utc(datetime.combine(from_date, datetime.min.time()))
else:
from_dt = None
if to_datetime:
to_dt = to_utc(to_datetime)
elif to_date:
to_dt = to_utc(datetime.combine(to_date, datetime.max.time()))
else:
to_dt = None
return await service.get_source_distribution(from_dt=from_dt, to_dt=to_dt)