diff --git a/app/schemas/ingest.py b/app/schemas/ingest.py new file mode 100644 index 0000000..6e55903 --- /dev/null +++ b/app/schemas/ingest.py @@ -0,0 +1,43 @@ +from enum import Enum +from typing import Dict, Any, List, Optional +from pydantic import BaseModel, Field + + +class PlatformType(str, Enum): + BOSS = "boss" + QCWY = "qcwy" + ZHILIAN = "zhilian" + + +class ChannelType(str, Enum): + MINI = "mini" + WEB = "web" + APP = "app" + COMPANY = "company" # 公司关联职位(与搜索职位 mini 区分) + + +class DataType(str, Enum): + JOB = "job" + COMPANY = "company" + + +class IngestSingleRequest(BaseModel): + data: Dict[str, Any] = Field(..., description="要存储的数据") + data_type: DataType = Field(..., description="数据类型") + platform: PlatformType = Field(..., description="平台类型") + channel: ChannelType = Field(ChannelType.MINI, description="渠道类型") + check_duplicate: bool = Field(True, description="是否检查重复") + + +class IngestBatchRequest(BaseModel): + data_list: List[Dict[str, Any]] = Field(..., description="要存储的数据列表") + data_type: DataType = Field(..., description="数据类型") + platform: PlatformType = Field(..., description="平台类型") + channel: ChannelType = Field(ChannelType.MINI, description="渠道类型") + check_duplicate: bool = Field(True, description="是否检查重复") + + +class IngestResponse(BaseModel): + code: int = 200 + message: str = "ok" + data: Optional[Dict[str, Any]] = None diff --git a/app/services/company_jobs_sync.py b/app/services/company_jobs_sync.py new file mode 100644 index 0000000..f647588 --- /dev/null +++ b/app/services/company_jobs_sync.py @@ -0,0 +1,133 @@ +import asyncio +import time +from typing import Any, Dict, List, Optional + +from loguru import logger + +from app.core.clickhouse import clickhouse_manager +from app.models.token import BossToken +from app.services.crawler.boss import BossService +from app.services.crawler.qcwy import QcwyService +from app.services.crawler.zhilian import ZhilianService +from app.services.ingest import IngestService + + +def _qcwy_extract_items(resp: dict) -> list: + if not isinstance(resp, dict): + return [] + rb = resp.get("resultbody") or resp.get("resultBody") + if isinstance(rb, dict): + job_node = rb.get("job") + if isinstance(job_node, dict) and isinstance(job_node.get("items"), list): + return job_node["items"] + for key in ("items", "list", "jobs", "jobList"): + val = resp.get(key) + if isinstance(val, list): + return val + return [] + + +class CompanyJobsSyncService: + _TOKEN_REFRESH_INTERVAL = 3600 + + def __init__(self): + self.boss_service = BossService() + self.qcwy_service = QcwyService() + self.zhilian_service = ZhilianService() + self.data_router: Optional[IngestService] = None + self._boss_token_loaded = False + self._token_loaded_at: float = 0 + + def set_proxy(self, proxy: Optional[str]) -> None: + self.boss_service.set_proxy(proxy) + self.qcwy_service.set_proxy(proxy) + self.zhilian_service.set_proxy(proxy) + + async def get_data_router(self) -> IngestService: + if not self.data_router: + client = await clickhouse_manager.get_client() + self.data_router = IngestService(client) + return self.data_router + + async def _ensure_boss_token_loaded(self) -> None: + now = time.time() + if ( + self._boss_token_loaded + and self.boss_service.login_data.get("mpt") + and now - self._token_loaded_at < self._TOKEN_REFRESH_INTERVAL + ): + return + token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first() + if not token_obj: + logger.warning("BossToken not found or inactive in CompanyJobsSyncService") + return + self.boss_service.set_login_data(token_obj.mpt or "", "") + self._boss_token_loaded = True + self._token_loaded_at = now + + async def sync_company_jobs(self, source: str, company_id: str) -> Dict[str, Any]: + router = await self.get_data_router() + + if source == "boss": + await self._ensure_boss_token_loaded() + data = await asyncio.to_thread(self.boss_service.get_company_jobs_by_id, company_id) + jobs = self._extract_boss_jobs(data) + elif source == "qcwy": + data = await asyncio.to_thread(self.qcwy_service.get_company_jobs_by_id, company_id) + jobs = self._extract_qcwy_jobs(data) + elif source == "zhilian": + data = await asyncio.to_thread(self.zhilian_service.get_company_jobs_by_id, company_id) + jobs = self._extract_zhilian_jobs(data) + else: + raise ValueError(f"unsupported source: {source}") + + if not jobs: + return { + "success": True, + "source": source, + "company_id": company_id, + "jobs_fetched": 0, + "stored_success": 0, + "duplicate": 0, + "failed": 0, + "original_data": data, + } + + store_result = await router.store_batch(source, "company", "job", jobs) + return { + "success": True, + "source": source, + "company_id": company_id, + "jobs_fetched": len(jobs), + "stored_success": store_result.get("success", 0), + "duplicate": store_result.get("duplicate", 0), + "failed": store_result.get("failed", 0), + "errors": store_result.get("errors", []), + "original_data": data, + } + + @staticmethod + def _extract_boss_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]: + if not isinstance(data, dict): + return [] + zp_data = data.get("zpData") or {} + if isinstance(zp_data.get("jobList"), list): + return zp_data.get("jobList") or [] + if isinstance(zp_data.get("list"), list): + return zp_data.get("list") or [] + return [] + + @staticmethod + def _extract_qcwy_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]: + jobs_list = _qcwy_extract_items(data or {}) + return jobs_list if isinstance(jobs_list, list) else [] + + @staticmethod + def _extract_zhilian_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]: + if not isinstance(data, dict): + return [] + jobs = data.get("list") + if not isinstance(jobs, list): + data_field = data.get("data") or {} + jobs = data_field.get("list") or [] + return jobs if isinstance(jobs, list) else [] diff --git a/app/services/ingest/configs/boss.py b/app/services/ingest/configs/boss.py new file mode 100644 index 0000000..d70bce8 --- /dev/null +++ b/app/services/ingest/configs/boss.py @@ -0,0 +1,70 @@ +from typing import Dict, Any, Optional + +from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register +from app.services.ingest.remote_push import safe_get, safe_join + + +def _extract_job_id(data: Dict[str, Any]) -> Optional[str]: + job_base = data.get("jobBaseInfoVO", {}) + val = job_base.get("jobId") if job_base else None + return str(val) if val else None + + +def _extract_company_name(data: Dict[str, Any]) -> Optional[str]: + name = data.get("name") or (data.get("companyFullInfoVO") or {}).get("name") + return str(name) if name else None + + +def _build_boss_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + boss_base = data.get("bossBaseInfoVO") or {} + job_base = data.get("jobBaseInfoVO") or {} + brand = data.get("brandComInfoVO") or {} + return { + "source_type": "Boss直聘", + "name": safe_get(brand, "brandName"), + "common_name": safe_get(boss_base, "brandName"), + "title": safe_get(job_base, "positionName"), + "title_addr": safe_get(job_base, "positionName"), + "description": safe_get(job_base, "jobDesc"), + "education": safe_get(job_base, "degreeName"), + "skill": safe_join(job_base.get("requiredSkills")), + "welfare": safe_join(job_base.get("salaryWelfareInfo")), + "years": safe_get(job_base, "experienceName"), + "salary": f'{safe_get(job_base, "lowSalary")}-{safe_get(job_base, "highSalary")}', + "location": safe_get(job_base, "locationName", "位置信息未找到"), + "position": safe_get(job_base, "locationDesc", "位置信息未找到"), + "job_type": "全职", + "size": safe_get(brand, "scaleName"), + "employer_type": "全职", + "industry": safe_get(brand, "industryName"), + "job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "", + "date": "", "start_date": "", "end_date": "", + "age": "", "sex": "", "number": "", + "url": f"https://www.zhipin.com/job_detail/{safe_get(job_base, 'encryptJobId')}.html", + "company_id": safe_get(brand, "encryptBrandId"), + "company_name": safe_get(brand, "brandName"), + "company_url": f"https://www.zhipin.com/gongsi/{safe_get(brand, 'encryptBrandId')}.html", + "company_desc": safe_get(brand, "introduce"), + "base_data": data, + } + + +register(PlatformConfig( + platform="boss", channel="mini", data_type="job", + table="boss_job", + dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),), + push_mapper=_build_boss_push, +)) + +register(PlatformConfig( + platform="boss", channel="mini", data_type="company", + table="boss_company", + dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),), +)) + +# 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分) +register(PlatformConfig( + platform="boss", channel="company", data_type="job", + table="boss_job", + dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),), +)) diff --git a/app/services/ingest/configs/qcwy.py b/app/services/ingest/configs/qcwy.py new file mode 100644 index 0000000..1ea3a3e --- /dev/null +++ b/app/services/ingest/configs/qcwy.py @@ -0,0 +1,103 @@ +from typing import Dict, Any, Optional + +from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register +from app.services.ingest.remote_push import safe_join + + +def _extract_job_id(data: Dict[str, Any]) -> Optional[str]: + val = data.get("jobId") + return str(val) if val else None + + +def _extract_update_dt(data: Dict[str, Any]) -> Optional[str]: + val = data.get("updateDateTime") + return str(val) if val else None + + +def _extract_company_name(data: Dict[str, Any]) -> Optional[str]: + name = data.get("companyName") or data.get("company_name") + return str(name) if name else None + + +def _build_qcwy_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + welfare_list = data.get("jobWelfareCodeDataList") + if isinstance(welfare_list, list): + welfare_str = ",".join( + str(item.get("chineseTitle") or item.get("typeTitle") or item.get("englishTitle") or item.get("code")) + for item in welfare_list if isinstance(item, dict) + ) + elif isinstance(welfare_list, str): + welfare_str = welfare_list.replace("[", "").replace("]", "") + else: + welfare_str = "" + + raw_location = data.get("location") or "" + if not raw_location: + work_loc = data.get("workLocation") or {} + raw_location = work_loc.get("workAddress") or work_loc.get("address") or "" + location_val = raw_location or "位置信息未找到" + + raw_area = data.get("jobAreaString") or "" + if not raw_area: + level_detail = data.get("jobAreaLevelDetail") or {} + city_str = level_detail.get("cityString") or "" + landmark_str = level_detail.get("landMarkString") or "" + raw_area = f"{city_str}{landmark_str}".strip() + area_val = raw_area or "位置信息未找到" + + return { + "source_type": "前程无忧", + "name": data.get("companyName"), + "title": data.get("jobName"), + "title_addr": data.get("jobName"), + "description": data.get("jobDescribe"), + "age": "", "sex": "", "number": "", + "education": data.get("degreeString"), + "skill": safe_join(data.get("jobTagsForOrder")), + "welfare": welfare_str, + "years": data.get("workYearString"), + "salary": f'{data.get("jobSalaryMax", "")}-{data.get("jobSalaryMin", "")}', + "location": location_val, + "position": area_val, + "date": data.get("confirmDateString"), + "start_date": data.get("confirmDateString"), + "end_date": "", + "job_type": data.get("termStr"), + "size": data.get("companySizeString"), + "employer_type": data.get("companyTypeString"), + "industry": f'{data.get("major1Str", "")}-{data.get("major2Str", "")}', + "job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "", + "url": data.get("jobHref"), + "company_id": data.get("coId"), + "company_name": data.get("fullCompanyName"), + "company_url": data.get("companyHref"), + "company_desc": data.get("company_desc", ""), + "base_data": data, + } + + +register(PlatformConfig( + platform="qcwy", channel="mini", data_type="job", + table="qcwy_job", + dedup_fields=( + DedupFieldSpec(column="job_id", extractor=_extract_job_id), + DedupFieldSpec(column="update_date_time", extractor=_extract_update_dt), + ), + push_mapper=_build_qcwy_push, +)) + +register(PlatformConfig( + platform="qcwy", channel="mini", data_type="company", + table="qcwy_company", + dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),), +)) + +# 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分) +register(PlatformConfig( + platform="qcwy", channel="company", data_type="job", + table="qcwy_job", + dedup_fields=( + DedupFieldSpec(column="job_id", extractor=_extract_job_id), + DedupFieldSpec(column="update_date_time", extractor=_extract_update_dt), + ), +)) diff --git a/app/services/ingest/configs/zhilian.py b/app/services/ingest/configs/zhilian.py new file mode 100644 index 0000000..6a85ee8 --- /dev/null +++ b/app/services/ingest/configs/zhilian.py @@ -0,0 +1,81 @@ +from typing import Dict, Any, Optional + +from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register +from app.services.ingest.remote_push import safe_get, safe_join + + +def _extract_number(data: Dict[str, Any]) -> Optional[str]: + val = data.get("number") + return str(val) if val else None + + +def _extract_fpt(data: Dict[str, Any]) -> Optional[str]: + val = data.get("firstPublishTime") + return str(val) if val else None + + +def _extract_company_name(data: Dict[str, Any]) -> Optional[str]: + name = data.get("companyName") or data.get("name") + return str(name) if name else None + + +def _build_zhilian_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + skill_labels = data.get("skillLabel") or [] + skill_values = [tag["value"] for tag in skill_labels if isinstance(tag, dict) and "value" in tag] + return { + "source_type": "智联招聘", + "name": safe_get(data, "companyName"), + "common_name": safe_get(data, "companyName"), + "title": safe_get(data, "name"), + "title_addr": safe_get(data, "name"), + "description": safe_get(data, "jobSummary"), + "education": safe_get(data, "education"), + "skill": safe_join(skill_values), + "welfare": "", + "years": safe_get(data, "workingExp"), + "salary": safe_get(data, "salary60"), + "location": f"{safe_get(data, 'workCity')}{safe_get(data, 'cityDistrict')}", + "position": f"{safe_get(data, 'workCity')}{safe_get(data, 'cityDistrict')}", + "job_type": safe_get(data, "workType"), + "size": safe_get(data, "companySize"), + "employer_type": safe_get(data, "propertyName"), + "industry": safe_get(data, "industryName"), + "job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "", + "date": safe_get(data, "firstPublishTime"), + "start_date": "", "end_date": "", + "age": "", "sex": "", + "number": str(safe_get(data, "recruitNumber")), + "url": safe_get(data, "positionURL"), + "company_id": str(safe_get(data, "companyId")), + "company_name": safe_get(data, "companyName"), + "company_url": safe_get(data, "companyUrl"), + "company_desc": safe_get(data, "companyDesc"), + "base_data": data, + } + + +register(PlatformConfig( + platform="zhilian", channel="mini", data_type="job", + table="zhilian_job", + dedup_fields=( + DedupFieldSpec(column="number", extractor=_extract_number), + DedupFieldSpec(column="first_publish_time", extractor=_extract_fpt), + ), + push_mapper=_build_zhilian_push, +)) + +register(PlatformConfig( + platform="zhilian", channel="mini", data_type="company", + table="zhilian_company", + dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),), +)) + +# 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分) +register(PlatformConfig( + platform="zhilian", channel="company", data_type="job", + table="zhilian_job", + dedup_fields=( + DedupFieldSpec(column="number", extractor=_extract_number), + DedupFieldSpec(column="first_publish_time", extractor=_extract_fpt), + ), +)) diff --git a/app/services/ingest/dedup.py b/app/services/ingest/dedup.py new file mode 100644 index 0000000..eef60dd --- /dev/null +++ b/app/services/ingest/dedup.py @@ -0,0 +1,88 @@ +import json +from datetime import datetime +from typing import Dict, Any, List, Tuple + +from clickhouse_connect.driver import AsyncClient + +from app.log import logger +from app.services.ingest.registry import PlatformConfig + + +def build_insert_row( + config: PlatformConfig, data: Dict[str, Any], channel: str, +) -> Tuple[List[str], List[Any]]: + """构建 ClickHouse 插入行(含 channel 列)""" + now = datetime.now() + columns = ["id", "json_data", "channel", "created_at", "updated_at"] + values: List[Any] = [0, json.dumps(data, ensure_ascii=False), channel, now, now] + + for spec in config.dedup_fields: + extracted = spec.extractor(data) + columns.append(spec.column) + values.append(str(extracted) if extracted else "") + + return columns, values + + +async def batch_dedup_filter( + client: AsyncClient, + config: PlatformConfig, + rows: List[Dict[str, Any]], + all_columns: List[str], + all_values: List[List[Any]], +) -> Tuple[List[List[Any]], int]: + """批量去重过滤,返回 (过滤后的 values 列表, 被忽略数量)""" + dedup_cols = config.dedup_columns + if not dedup_cols or not all_values: + return all_values, 0 + + table = f"job_data.{config.table}" + + # 建立 column name -> index 映射 + col_idx = {name: i for i, name in enumerate(all_columns)} + + if len(dedup_cols) == 1: + key_col = dedup_cols[0] + idx = col_idx[key_col] + candidate_keys = list({str(row[idx]) for row in all_values if row[idx]}) + if not candidate_keys: + return all_values, 0 + + query = ( + f"SELECT {key_col} FROM {table} " + f"WHERE {key_col} IN {{keys:Array(String)}} " + f"AND created_at > now() - INTERVAL 30 DAY" + ) + existing = await client.query(query, parameters={"keys": candidate_keys}) + existing_set = {str(r[0]) for r in existing.result_rows} + + filtered = [row for row in all_values if str(row[idx]) not in existing_set] + return filtered, len(all_values) - len(filtered) + + if len(dedup_cols) == 2: + c1, c2 = dedup_cols + idx1, idx2 = col_idx[c1], col_idx[c2] + candidate_c1 = list({str(row[idx1]) for row in all_values if row[idx1]}) + if not candidate_c1: + return all_values, 0 + + query = ( + f"SELECT {c1}, {c2} FROM {table} " + f"WHERE {c1} IN {{keys:Array(String)}} " + f"AND created_at > now() - INTERVAL 30 DAY" + ) + existing = await client.query(query, parameters={"keys": candidate_c1}) + existing_map: Dict[str, set] = {} + for r in existing.result_rows: + existing_map.setdefault(str(r[0]), set()).add(str(r[1])) + + filtered = [ + row for row in all_values + if str(row[idx1]) not in existing_map + or str(row[idx2]) not in existing_map.get(str(row[idx1]), set()) + ] + return filtered, len(all_values) - len(filtered) + + # 不支持 3+ 列去重,直接返回 + logger.warning(f"不支持 {len(dedup_cols)} 列去重,跳过过滤") + return all_values, 0 diff --git a/tests/ingest/__init__.py b/tests/ingest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/ingest/test_dedup.py b/tests/ingest/test_dedup.py new file mode 100644 index 0000000..14953ff --- /dev/null +++ b/tests/ingest/test_dedup.py @@ -0,0 +1,155 @@ +""" +dedup.py 测试 — DATA-01 30天窗口去重验证 +使用 asyncio.run() 运行异步函数(避免依赖 pytest-asyncio) +""" +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.services.ingest.dedup import batch_dedup_filter, build_insert_row +from app.services.ingest.registry import PlatformConfig, DedupFieldSpec + + +# ─── 辅助工厂函数 ───────────────────────────────────── +def _make_config_single(table: str = "boss_job") -> PlatformConfig: + return PlatformConfig( + platform="boss", channel="mini", data_type="job", + table=table, + dedup_fields=(DedupFieldSpec(column="job_id", extractor=lambda d: d.get("jobId")),), + ) + + +def _make_config_double(table: str = "qcwy_job") -> PlatformConfig: + return PlatformConfig( + platform="qcwy", channel="mini", data_type="job", + table=table, + dedup_fields=( + DedupFieldSpec(column="job_id", extractor=lambda d: d.get("jobId")), + DedupFieldSpec(column="update_date_time", extractor=lambda d: d.get("updateDt")), + ), + ) + + +def _make_mock_client(rows: list) -> AsyncMock: + mock_result = MagicMock() + mock_result.result_rows = rows + mock_client = AsyncMock() + mock_client.query.return_value = mock_result + return mock_client + + +def _make_rows(job_ids: list[str]) -> tuple[list[str], list[list]]: + columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id"] + values = [[0, "{}", "mini", None, None, jid] for jid in job_ids] + return columns, values + + +def _make_double_rows(pairs: list[tuple[str, str]]) -> tuple[list[str], list[list]]: + columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id", "update_date_time"] + values = [[0, "{}", "mini", None, None, jid, udt] for jid, udt in pairs] + return columns, values + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +# ─── 测试:单字段去重 ───────────────────────────────── + +def test_single_field_dedup_within_30_days(): + """30 天内有相同 job_id → 视为重复,过滤""" + mock_client = _make_mock_client([("JOB001",)]) # 已存在 + + config = _make_config_single() + columns, values = _make_rows(["JOB001", "JOB002"]) + rows = [{"jobId": "JOB001"}, {"jobId": "JOB002"}] + + filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values)) + + assert ignored == 1 + assert len(filtered) == 1 + assert filtered[0][5] == "JOB002" + + # 验证 SQL 包含 30 天窗口 + sql = mock_client.query.call_args[0][0] + assert "INTERVAL 30 DAY" in sql, f"SQL 应包含 INTERVAL 30 DAY,实际: {sql}" + + +def test_single_field_dedup_no_existing(): + """无现有记录 → 允许全部入库""" + mock_client = _make_mock_client([]) + + config = _make_config_single() + columns, values = _make_rows(["JOB_NEW_1", "JOB_NEW_2"]) + rows = [{"jobId": "JOB_NEW_1"}, {"jobId": "JOB_NEW_2"}] + + filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values)) + + assert ignored == 0 + assert len(filtered) == 2 + + +def test_single_field_dedup_sql_has_30_day_window(): + """验证单字段 dedup SQL 中包含 30 天时间窗口""" + mock_client = _make_mock_client([]) + + config = _make_config_single() + columns, values = _make_rows(["JOB_X"]) + rows = [{"jobId": "JOB_X"}] + + _run(batch_dedup_filter(mock_client, config, rows, columns, values)) + + assert mock_client.query.called + sql = mock_client.query.call_args[0][0] + assert "INTERVAL 30 DAY" in sql + + +# ─── 测试:双字段去重 ───────────────────────────────── + +def test_double_field_dedup_within_30_days(): + """30 天内双字段都匹配 → 视为重复,过滤""" + mock_client = _make_mock_client([("JOB001", "2026-03-01")]) # 已存在 + + config = _make_config_double() + columns, values = _make_double_rows([("JOB001", "2026-03-01"), ("JOB002", "2026-03-10")]) + rows = [{"jobId": "JOB001", "updateDt": "2026-03-01"}, {"jobId": "JOB002", "updateDt": "2026-03-10"}] + + filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values)) + + assert ignored == 1 + assert len(filtered) == 1 + sql = mock_client.query.call_args[0][0] + assert "INTERVAL 30 DAY" in sql + + +def test_dedup_empty_input(): + """空 all_values → 直接返回,不调用 ClickHouse""" + mock_client = AsyncMock() + config = _make_config_single() + columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id"] + + filtered, ignored = _run(batch_dedup_filter(mock_client, config, [], columns, [])) + + assert ignored == 0 + assert filtered == [] + mock_client.query.assert_not_called() + + +# ─── 测试:build_insert_row ──────────────────────────── + +def test_build_insert_row_has_channel(): + """build_insert_row 生成的列中包含 channel 和 job_id""" + config = _make_config_single() + data = {"jobId": "JOB999"} + + columns, values = build_insert_row(config, data, "mini") + + assert "channel" in columns + channel_idx = columns.index("channel") + assert values[channel_idx] == "mini" + assert "job_id" in columns + job_id_idx = columns.index("job_id") + assert values[job_id_idx] == "JOB999"