Plan 01 - DATA-01: 30-day window dedup fix: - dedup.py: both single-field and double-field SQL queries now include AND created_at > now() - INTERVAL 30 DAY - tests/ingest/test_dedup.py: 6 mock tests validating 30-day window Plan 02 - DATA-04: company vs search job channel separation: - schemas/ingest.py: ChannelType.COMPANY = 'company' - configs/boss.py: register channel='company' config - configs/qcwy.py: register channel='company' config - configs/zhilian.py: register channel='company' config - company_jobs_sync.py: store_batch(..., 'mini', ...) → (..., 'company', ...) DATA-02: confirmed already complete (job.py has /data/batch-async endpoint) DATA-03: confirmed already complete (company_cleaner.py full pipeline) Full regression: 112 passed (106 existing + 6 new)
156 lines
5.4 KiB
Python
156 lines
5.4 KiB
Python
"""
|
||
dedup.py 测试 — DATA-01 30天窗口去重验证
|
||
使用 asyncio.run() 运行异步函数(避免依赖 pytest-asyncio)
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from unittest.mock import AsyncMock, MagicMock
|
||
|
||
import pytest
|
||
|
||
from app.services.ingest.dedup import batch_dedup_filter, build_insert_row
|
||
from app.services.ingest.registry import PlatformConfig, DedupFieldSpec
|
||
|
||
|
||
# ─── 辅助工厂函数 ─────────────────────────────────────
|
||
def _make_config_single(table: str = "boss_job") -> PlatformConfig:
|
||
return PlatformConfig(
|
||
platform="boss", channel="mini", data_type="job",
|
||
table=table,
|
||
dedup_fields=(DedupFieldSpec(column="job_id", extractor=lambda d: d.get("jobId")),),
|
||
)
|
||
|
||
|
||
def _make_config_double(table: str = "qcwy_job") -> PlatformConfig:
|
||
return PlatformConfig(
|
||
platform="qcwy", channel="mini", data_type="job",
|
||
table=table,
|
||
dedup_fields=(
|
||
DedupFieldSpec(column="job_id", extractor=lambda d: d.get("jobId")),
|
||
DedupFieldSpec(column="update_date_time", extractor=lambda d: d.get("updateDt")),
|
||
),
|
||
)
|
||
|
||
|
||
def _make_mock_client(rows: list) -> AsyncMock:
|
||
mock_result = MagicMock()
|
||
mock_result.result_rows = rows
|
||
mock_client = AsyncMock()
|
||
mock_client.query.return_value = mock_result
|
||
return mock_client
|
||
|
||
|
||
def _make_rows(job_ids: list[str]) -> tuple[list[str], list[list]]:
|
||
columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id"]
|
||
values = [[0, "{}", "mini", None, None, jid] for jid in job_ids]
|
||
return columns, values
|
||
|
||
|
||
def _make_double_rows(pairs: list[tuple[str, str]]) -> tuple[list[str], list[list]]:
|
||
columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id", "update_date_time"]
|
||
values = [[0, "{}", "mini", None, None, jid, udt] for jid, udt in pairs]
|
||
return columns, values
|
||
|
||
|
||
def _run(coro):
|
||
return asyncio.get_event_loop().run_until_complete(coro)
|
||
|
||
|
||
# ─── 测试:单字段去重 ─────────────────────────────────
|
||
|
||
def test_single_field_dedup_within_30_days():
|
||
"""30 天内有相同 job_id → 视为重复,过滤"""
|
||
mock_client = _make_mock_client([("JOB001",)]) # 已存在
|
||
|
||
config = _make_config_single()
|
||
columns, values = _make_rows(["JOB001", "JOB002"])
|
||
rows = [{"jobId": "JOB001"}, {"jobId": "JOB002"}]
|
||
|
||
filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values))
|
||
|
||
assert ignored == 1
|
||
assert len(filtered) == 1
|
||
assert filtered[0][5] == "JOB002"
|
||
|
||
# 验证 SQL 包含 30 天窗口
|
||
sql = mock_client.query.call_args[0][0]
|
||
assert "INTERVAL 30 DAY" in sql, f"SQL 应包含 INTERVAL 30 DAY,实际: {sql}"
|
||
|
||
|
||
def test_single_field_dedup_no_existing():
|
||
"""无现有记录 → 允许全部入库"""
|
||
mock_client = _make_mock_client([])
|
||
|
||
config = _make_config_single()
|
||
columns, values = _make_rows(["JOB_NEW_1", "JOB_NEW_2"])
|
||
rows = [{"jobId": "JOB_NEW_1"}, {"jobId": "JOB_NEW_2"}]
|
||
|
||
filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values))
|
||
|
||
assert ignored == 0
|
||
assert len(filtered) == 2
|
||
|
||
|
||
def test_single_field_dedup_sql_has_30_day_window():
|
||
"""验证单字段 dedup SQL 中包含 30 天时间窗口"""
|
||
mock_client = _make_mock_client([])
|
||
|
||
config = _make_config_single()
|
||
columns, values = _make_rows(["JOB_X"])
|
||
rows = [{"jobId": "JOB_X"}]
|
||
|
||
_run(batch_dedup_filter(mock_client, config, rows, columns, values))
|
||
|
||
assert mock_client.query.called
|
||
sql = mock_client.query.call_args[0][0]
|
||
assert "INTERVAL 30 DAY" in sql
|
||
|
||
|
||
# ─── 测试:双字段去重 ─────────────────────────────────
|
||
|
||
def test_double_field_dedup_within_30_days():
|
||
"""30 天内双字段都匹配 → 视为重复,过滤"""
|
||
mock_client = _make_mock_client([("JOB001", "2026-03-01")]) # 已存在
|
||
|
||
config = _make_config_double()
|
||
columns, values = _make_double_rows([("JOB001", "2026-03-01"), ("JOB002", "2026-03-10")])
|
||
rows = [{"jobId": "JOB001", "updateDt": "2026-03-01"}, {"jobId": "JOB002", "updateDt": "2026-03-10"}]
|
||
|
||
filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values))
|
||
|
||
assert ignored == 1
|
||
assert len(filtered) == 1
|
||
sql = mock_client.query.call_args[0][0]
|
||
assert "INTERVAL 30 DAY" in sql
|
||
|
||
|
||
def test_dedup_empty_input():
|
||
"""空 all_values → 直接返回,不调用 ClickHouse"""
|
||
mock_client = AsyncMock()
|
||
config = _make_config_single()
|
||
columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id"]
|
||
|
||
filtered, ignored = _run(batch_dedup_filter(mock_client, config, [], columns, []))
|
||
|
||
assert ignored == 0
|
||
assert filtered == []
|
||
mock_client.query.assert_not_called()
|
||
|
||
|
||
# ─── 测试:build_insert_row ────────────────────────────
|
||
|
||
def test_build_insert_row_has_channel():
|
||
"""build_insert_row 生成的列中包含 channel 和 job_id"""
|
||
config = _make_config_single()
|
||
data = {"jobId": "JOB999"}
|
||
|
||
columns, values = build_insert_row(config, data, "mini")
|
||
|
||
assert "channel" in columns
|
||
channel_idx = columns.index("channel")
|
||
assert values[channel_idx] == "mini"
|
||
assert "job_id" in columns
|
||
job_id_idx = columns.index("job_id")
|
||
assert values[job_id_idx] == "JOB999"
|