from clickhouse_connect import get_async_client from clickhouse_connect.driver import AsyncClient as AsyncClickHouseClient from app.settings.config import settings import urllib3 from typing import Any, Dict async def get_clickhouse_client() -> AsyncClickHouseClient: """获取ClickHouse异步客户端""" # 创建自定义连接池管理器,适配多worker模式 # 考虑到多worker环境,每个worker的连接池应该适中 # maxsize参数控制每个主机的连接池大小,block=True防止连接池溢出 pool_mgr = urllib3.PoolManager( num_pools=2, # 连接池数量,减少以适配多worker maxsize=5, # 每个连接池的最大连接数,每个worker最多10个连接 block=True # 当连接池满时阻塞而不是创建新连接 ) return await get_async_client( host=settings.CLICKHOUSE_HOST, username=settings.CLICKHOUSE_USER, password=settings.CLICKHOUSE_PASS, database=settings.CLICKHOUSE_DB, port=settings.CLICKHOUSE_PORT, pool_mgr=pool_mgr, connect_timeout=30, send_receive_timeout=120 ) class ClickHouseManager: """ClickHouse连接管理器""" def __init__(self): self._client: AsyncClickHouseClient = None async def get_client(self) -> AsyncClickHouseClient: """获取或创建ClickHouse客户端""" if self._client is None: self._client = await get_clickhouse_client() return self._client async def execute(self, query: str, parameters: Dict[str, Any] = None): """执行SQL查询""" client = await self.get_client() return await client.query(query, parameters=parameters) async def close(self): """关闭ClickHouse连接""" if self._client: close_result = self._client.close() if close_result is not None: await close_result self._client = None # 全局ClickHouse管理器实例 clickhouse_manager = ClickHouseManager()