54 lines
1.5 KiB
Python
54 lines
1.5 KiB
Python
from dataclasses import dataclass, field
|
|
from typing import Callable, Dict, Any, List, Optional, Tuple
|
|
|
|
from app.log import logger
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DedupFieldSpec:
|
|
"""去重字段规格:从原始数据中提取去重列值"""
|
|
column: str
|
|
extractor: Callable[[Dict[str, Any]], Optional[str]]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PlatformConfig:
|
|
"""平台配置(不可变)"""
|
|
platform: str
|
|
channel: str
|
|
data_type: str
|
|
table: str
|
|
dedup_fields: Tuple[DedupFieldSpec, ...] = ()
|
|
push_mapper: Optional[Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]] = None
|
|
|
|
@property
|
|
def key(self) -> Tuple[str, str, str]:
|
|
return (self.platform, self.channel, self.data_type)
|
|
|
|
@property
|
|
def dedup_columns(self) -> List[str]:
|
|
return [f.column for f in self.dedup_fields]
|
|
|
|
|
|
# 全局注册表
|
|
_REGISTRY: Dict[Tuple[str, str, str], PlatformConfig] = {}
|
|
|
|
|
|
def register(config: PlatformConfig) -> None:
|
|
key = config.key
|
|
if key in _REGISTRY:
|
|
logger.warning(f"覆盖已有注册: {key}")
|
|
_REGISTRY[key] = config
|
|
|
|
|
|
def get_config(platform: str, channel: str, data_type: str) -> PlatformConfig:
|
|
key = (platform, channel, data_type)
|
|
config = _REGISTRY.get(key)
|
|
if config is None:
|
|
raise ValueError(f"未注册的平台配置: platform={platform}, channel={channel}, data_type={data_type}")
|
|
return config
|
|
|
|
|
|
def list_configs() -> List[PlatformConfig]:
|
|
return list(_REGISTRY.values())
|