win 3d202c3486 feat(05): data pipeline optimization (DATA-01, DATA-04)
Plan 01 - DATA-01: 30-day window dedup fix:
- dedup.py: both single-field and double-field SQL queries now include
  AND created_at > now() - INTERVAL 30 DAY
- tests/ingest/test_dedup.py: 6 mock tests validating 30-day window

Plan 02 - DATA-04: company vs search job channel separation:
- schemas/ingest.py: ChannelType.COMPANY = 'company'
- configs/boss.py: register channel='company' config
- configs/qcwy.py: register channel='company' config
- configs/zhilian.py: register channel='company' config
- company_jobs_sync.py: store_batch(..., 'mini', ...) → (..., 'company', ...)

DATA-02: confirmed already complete (job.py has /data/batch-async endpoint)
DATA-03: confirmed already complete (company_cleaner.py full pipeline)

Full regression: 112 passed (106 existing + 6 new)
2026-03-21 19:50:06 +08:00

89 lines
3.0 KiB
Python

import json
from datetime import datetime
from typing import Dict, Any, List, Tuple
from clickhouse_connect.driver import AsyncClient
from app.log import logger
from app.services.ingest.registry import PlatformConfig
def build_insert_row(
config: PlatformConfig, data: Dict[str, Any], channel: str,
) -> Tuple[List[str], List[Any]]:
"""构建 ClickHouse 插入行(含 channel 列)"""
now = datetime.now()
columns = ["id", "json_data", "channel", "created_at", "updated_at"]
values: List[Any] = [0, json.dumps(data, ensure_ascii=False), channel, now, now]
for spec in config.dedup_fields:
extracted = spec.extractor(data)
columns.append(spec.column)
values.append(str(extracted) if extracted else "")
return columns, values
async def batch_dedup_filter(
client: AsyncClient,
config: PlatformConfig,
rows: List[Dict[str, Any]],
all_columns: List[str],
all_values: List[List[Any]],
) -> Tuple[List[List[Any]], int]:
"""批量去重过滤,返回 (过滤后的 values 列表, 被忽略数量)"""
dedup_cols = config.dedup_columns
if not dedup_cols or not all_values:
return all_values, 0
table = f"job_data.{config.table}"
# 建立 column name -> index 映射
col_idx = {name: i for i, name in enumerate(all_columns)}
if len(dedup_cols) == 1:
key_col = dedup_cols[0]
idx = col_idx[key_col]
candidate_keys = list({str(row[idx]) for row in all_values if row[idx]})
if not candidate_keys:
return all_values, 0
query = (
f"SELECT {key_col} FROM {table} "
f"WHERE {key_col} IN {{keys:Array(String)}} "
f"AND created_at > now() - INTERVAL 30 DAY"
)
existing = await client.query(query, parameters={"keys": candidate_keys})
existing_set = {str(r[0]) for r in existing.result_rows}
filtered = [row for row in all_values if str(row[idx]) not in existing_set]
return filtered, len(all_values) - len(filtered)
if len(dedup_cols) == 2:
c1, c2 = dedup_cols
idx1, idx2 = col_idx[c1], col_idx[c2]
candidate_c1 = list({str(row[idx1]) for row in all_values if row[idx1]})
if not candidate_c1:
return all_values, 0
query = (
f"SELECT {c1}, {c2} FROM {table} "
f"WHERE {c1} IN {{keys:Array(String)}} "
f"AND created_at > now() - INTERVAL 30 DAY"
)
existing = await client.query(query, parameters={"keys": candidate_c1})
existing_map: Dict[str, set] = {}
for r in existing.result_rows:
existing_map.setdefault(str(r[0]), set()).add(str(r[1]))
filtered = [
row for row in all_values
if str(row[idx1]) not in existing_map
or str(row[idx2]) not in existing_map.get(str(row[idx1]), set())
]
return filtered, len(all_values) - len(filtered)
# 不支持 3+ 列去重,直接返回
logger.warning(f"不支持 {len(dedup_cols)} 列去重,跳过过滤")
return all_values, 0