JobData/app/services/analytics_service.py

59 lines
1.8 KiB
Python

from datetime import datetime
from typing import Optional, Dict, Any, List
from clickhouse_connect.driver import AsyncClient
from app.repositories.clickhouse_repo import JobAnalyticsRepo
class AnalyticsService:
"""数据分析服务"""
def __init__(self, clickhouse_client: AsyncClient):
self.job_repo = JobAnalyticsRepo(clickhouse_client)
async def get_job_statistics(
self,
filters: Optional[Dict[str, Any]] = None,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
) -> Dict[str, Any]:
"""获取职位统计信息(仅返回总量)"""
total_jobs = await self.job_repo.get_job_count(
filters=filters, from_dt=from_dt, to_dt=to_dt
)
return {
"total_jobs": total_jobs,
"period": {
"from_date": from_dt.isoformat() if from_dt else None,
"to_date": to_dt.isoformat() if to_dt else None
}
}
async def get_volume_trend(
self,
interval: str = "day",
filters: Optional[Dict[str, Any]] = None,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
"""获取数据量趋势"""
return await self.job_repo.get_volume_trend(
interval=interval,
filters=filters,
from_dt=from_dt,
to_dt=to_dt
)
async def get_source_distribution(
self,
filters: Optional[Dict[str, Any]] = None,
from_dt: Optional[datetime] = None,
to_dt: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
"""获取数据来源分布"""
return await self.job_repo.get_source_distribution(
filters=filters,
from_dt=from_dt,
to_dt=to_dt
)