feat(05): data pipeline optimization (DATA-01, DATA-04)

Plan 01 - DATA-01: 30-day window dedup fix:
- dedup.py: both single-field and double-field SQL queries now include
  AND created_at > now() - INTERVAL 30 DAY
- tests/ingest/test_dedup.py: 6 mock tests validating 30-day window

Plan 02 - DATA-04: company vs search job channel separation:
- schemas/ingest.py: ChannelType.COMPANY = 'company'
- configs/boss.py: register channel='company' config
- configs/qcwy.py: register channel='company' config
- configs/zhilian.py: register channel='company' config
- company_jobs_sync.py: store_batch(..., 'mini', ...) → (..., 'company', ...)

DATA-02: confirmed already complete (job.py has /data/batch-async endpoint)
DATA-03: confirmed already complete (company_cleaner.py full pipeline)

Full regression: 112 passed (106 existing + 6 new)
This commit is contained in:
win 2026-03-21 19:50:06 +08:00
parent 9ef31cc87e
commit 3d202c3486
8 changed files with 673 additions and 0 deletions

43
app/schemas/ingest.py Normal file
View File

@ -0,0 +1,43 @@
from enum import Enum
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
class PlatformType(str, Enum):
BOSS = "boss"
QCWY = "qcwy"
ZHILIAN = "zhilian"
class ChannelType(str, Enum):
MINI = "mini"
WEB = "web"
APP = "app"
COMPANY = "company" # 公司关联职位(与搜索职位 mini 区分)
class DataType(str, Enum):
JOB = "job"
COMPANY = "company"
class IngestSingleRequest(BaseModel):
data: Dict[str, Any] = Field(..., description="要存储的数据")
data_type: DataType = Field(..., description="数据类型")
platform: PlatformType = Field(..., description="平台类型")
channel: ChannelType = Field(ChannelType.MINI, description="渠道类型")
check_duplicate: bool = Field(True, description="是否检查重复")
class IngestBatchRequest(BaseModel):
data_list: List[Dict[str, Any]] = Field(..., description="要存储的数据列表")
data_type: DataType = Field(..., description="数据类型")
platform: PlatformType = Field(..., description="平台类型")
channel: ChannelType = Field(ChannelType.MINI, description="渠道类型")
check_duplicate: bool = Field(True, description="是否检查重复")
class IngestResponse(BaseModel):
code: int = 200
message: str = "ok"
data: Optional[Dict[str, Any]] = None

View File

@ -0,0 +1,133 @@
import asyncio
import time
from typing import Any, Dict, List, Optional
from loguru import logger
from app.core.clickhouse import clickhouse_manager
from app.models.token import BossToken
from app.services.crawler.boss import BossService
from app.services.crawler.qcwy import QcwyService
from app.services.crawler.zhilian import ZhilianService
from app.services.ingest import IngestService
def _qcwy_extract_items(resp: dict) -> list:
if not isinstance(resp, dict):
return []
rb = resp.get("resultbody") or resp.get("resultBody")
if isinstance(rb, dict):
job_node = rb.get("job")
if isinstance(job_node, dict) and isinstance(job_node.get("items"), list):
return job_node["items"]
for key in ("items", "list", "jobs", "jobList"):
val = resp.get(key)
if isinstance(val, list):
return val
return []
class CompanyJobsSyncService:
_TOKEN_REFRESH_INTERVAL = 3600
def __init__(self):
self.boss_service = BossService()
self.qcwy_service = QcwyService()
self.zhilian_service = ZhilianService()
self.data_router: Optional[IngestService] = None
self._boss_token_loaded = False
self._token_loaded_at: float = 0
def set_proxy(self, proxy: Optional[str]) -> None:
self.boss_service.set_proxy(proxy)
self.qcwy_service.set_proxy(proxy)
self.zhilian_service.set_proxy(proxy)
async def get_data_router(self) -> IngestService:
if not self.data_router:
client = await clickhouse_manager.get_client()
self.data_router = IngestService(client)
return self.data_router
async def _ensure_boss_token_loaded(self) -> None:
now = time.time()
if (
self._boss_token_loaded
and self.boss_service.login_data.get("mpt")
and now - self._token_loaded_at < self._TOKEN_REFRESH_INTERVAL
):
return
token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first()
if not token_obj:
logger.warning("BossToken not found or inactive in CompanyJobsSyncService")
return
self.boss_service.set_login_data(token_obj.mpt or "", "")
self._boss_token_loaded = True
self._token_loaded_at = now
async def sync_company_jobs(self, source: str, company_id: str) -> Dict[str, Any]:
router = await self.get_data_router()
if source == "boss":
await self._ensure_boss_token_loaded()
data = await asyncio.to_thread(self.boss_service.get_company_jobs_by_id, company_id)
jobs = self._extract_boss_jobs(data)
elif source == "qcwy":
data = await asyncio.to_thread(self.qcwy_service.get_company_jobs_by_id, company_id)
jobs = self._extract_qcwy_jobs(data)
elif source == "zhilian":
data = await asyncio.to_thread(self.zhilian_service.get_company_jobs_by_id, company_id)
jobs = self._extract_zhilian_jobs(data)
else:
raise ValueError(f"unsupported source: {source}")
if not jobs:
return {
"success": True,
"source": source,
"company_id": company_id,
"jobs_fetched": 0,
"stored_success": 0,
"duplicate": 0,
"failed": 0,
"original_data": data,
}
store_result = await router.store_batch(source, "company", "job", jobs)
return {
"success": True,
"source": source,
"company_id": company_id,
"jobs_fetched": len(jobs),
"stored_success": store_result.get("success", 0),
"duplicate": store_result.get("duplicate", 0),
"failed": store_result.get("failed", 0),
"errors": store_result.get("errors", []),
"original_data": data,
}
@staticmethod
def _extract_boss_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not isinstance(data, dict):
return []
zp_data = data.get("zpData") or {}
if isinstance(zp_data.get("jobList"), list):
return zp_data.get("jobList") or []
if isinstance(zp_data.get("list"), list):
return zp_data.get("list") or []
return []
@staticmethod
def _extract_qcwy_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
jobs_list = _qcwy_extract_items(data or {})
return jobs_list if isinstance(jobs_list, list) else []
@staticmethod
def _extract_zhilian_jobs(data: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not isinstance(data, dict):
return []
jobs = data.get("list")
if not isinstance(jobs, list):
data_field = data.get("data") or {}
jobs = data_field.get("list") or []
return jobs if isinstance(jobs, list) else []

View File

@ -0,0 +1,70 @@
from typing import Dict, Any, Optional
from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register
from app.services.ingest.remote_push import safe_get, safe_join
def _extract_job_id(data: Dict[str, Any]) -> Optional[str]:
job_base = data.get("jobBaseInfoVO", {})
val = job_base.get("jobId") if job_base else None
return str(val) if val else None
def _extract_company_name(data: Dict[str, Any]) -> Optional[str]:
name = data.get("name") or (data.get("companyFullInfoVO") or {}).get("name")
return str(name) if name else None
def _build_boss_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
boss_base = data.get("bossBaseInfoVO") or {}
job_base = data.get("jobBaseInfoVO") or {}
brand = data.get("brandComInfoVO") or {}
return {
"source_type": "Boss直聘",
"name": safe_get(brand, "brandName"),
"common_name": safe_get(boss_base, "brandName"),
"title": safe_get(job_base, "positionName"),
"title_addr": safe_get(job_base, "positionName"),
"description": safe_get(job_base, "jobDesc"),
"education": safe_get(job_base, "degreeName"),
"skill": safe_join(job_base.get("requiredSkills")),
"welfare": safe_join(job_base.get("salaryWelfareInfo")),
"years": safe_get(job_base, "experienceName"),
"salary": f'{safe_get(job_base, "lowSalary")}-{safe_get(job_base, "highSalary")}',
"location": safe_get(job_base, "locationName", "位置信息未找到"),
"position": safe_get(job_base, "locationDesc", "位置信息未找到"),
"job_type": "全职",
"size": safe_get(brand, "scaleName"),
"employer_type": "全职",
"industry": safe_get(brand, "industryName"),
"job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "",
"date": "", "start_date": "", "end_date": "",
"age": "", "sex": "", "number": "",
"url": f"https://www.zhipin.com/job_detail/{safe_get(job_base, 'encryptJobId')}.html",
"company_id": safe_get(brand, "encryptBrandId"),
"company_name": safe_get(brand, "brandName"),
"company_url": f"https://www.zhipin.com/gongsi/{safe_get(brand, 'encryptBrandId')}.html",
"company_desc": safe_get(brand, "introduce"),
"base_data": data,
}
register(PlatformConfig(
platform="boss", channel="mini", data_type="job",
table="boss_job",
dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),),
push_mapper=_build_boss_push,
))
register(PlatformConfig(
platform="boss", channel="mini", data_type="company",
table="boss_company",
dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),),
))
# 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分)
register(PlatformConfig(
platform="boss", channel="company", data_type="job",
table="boss_job",
dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),),
))

View File

@ -0,0 +1,103 @@
from typing import Dict, Any, Optional
from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register
from app.services.ingest.remote_push import safe_join
def _extract_job_id(data: Dict[str, Any]) -> Optional[str]:
val = data.get("jobId")
return str(val) if val else None
def _extract_update_dt(data: Dict[str, Any]) -> Optional[str]:
val = data.get("updateDateTime")
return str(val) if val else None
def _extract_company_name(data: Dict[str, Any]) -> Optional[str]:
name = data.get("companyName") or data.get("company_name")
return str(name) if name else None
def _build_qcwy_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
welfare_list = data.get("jobWelfareCodeDataList")
if isinstance(welfare_list, list):
welfare_str = ",".join(
str(item.get("chineseTitle") or item.get("typeTitle") or item.get("englishTitle") or item.get("code"))
for item in welfare_list if isinstance(item, dict)
)
elif isinstance(welfare_list, str):
welfare_str = welfare_list.replace("[", "").replace("]", "")
else:
welfare_str = ""
raw_location = data.get("location") or ""
if not raw_location:
work_loc = data.get("workLocation") or {}
raw_location = work_loc.get("workAddress") or work_loc.get("address") or ""
location_val = raw_location or "位置信息未找到"
raw_area = data.get("jobAreaString") or ""
if not raw_area:
level_detail = data.get("jobAreaLevelDetail") or {}
city_str = level_detail.get("cityString") or ""
landmark_str = level_detail.get("landMarkString") or ""
raw_area = f"{city_str}{landmark_str}".strip()
area_val = raw_area or "位置信息未找到"
return {
"source_type": "前程无忧",
"name": data.get("companyName"),
"title": data.get("jobName"),
"title_addr": data.get("jobName"),
"description": data.get("jobDescribe"),
"age": "", "sex": "", "number": "",
"education": data.get("degreeString"),
"skill": safe_join(data.get("jobTagsForOrder")),
"welfare": welfare_str,
"years": data.get("workYearString"),
"salary": f'{data.get("jobSalaryMax", "")}-{data.get("jobSalaryMin", "")}',
"location": location_val,
"position": area_val,
"date": data.get("confirmDateString"),
"start_date": data.get("confirmDateString"),
"end_date": "",
"job_type": data.get("termStr"),
"size": data.get("companySizeString"),
"employer_type": data.get("companyTypeString"),
"industry": f'{data.get("major1Str", "")}-{data.get("major2Str", "")}',
"job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "",
"url": data.get("jobHref"),
"company_id": data.get("coId"),
"company_name": data.get("fullCompanyName"),
"company_url": data.get("companyHref"),
"company_desc": data.get("company_desc", ""),
"base_data": data,
}
register(PlatformConfig(
platform="qcwy", channel="mini", data_type="job",
table="qcwy_job",
dedup_fields=(
DedupFieldSpec(column="job_id", extractor=_extract_job_id),
DedupFieldSpec(column="update_date_time", extractor=_extract_update_dt),
),
push_mapper=_build_qcwy_push,
))
register(PlatformConfig(
platform="qcwy", channel="mini", data_type="company",
table="qcwy_company",
dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),),
))
# 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分)
register(PlatformConfig(
platform="qcwy", channel="company", data_type="job",
table="qcwy_job",
dedup_fields=(
DedupFieldSpec(column="job_id", extractor=_extract_job_id),
DedupFieldSpec(column="update_date_time", extractor=_extract_update_dt),
),
))

View File

@ -0,0 +1,81 @@
from typing import Dict, Any, Optional
from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register
from app.services.ingest.remote_push import safe_get, safe_join
def _extract_number(data: Dict[str, Any]) -> Optional[str]:
val = data.get("number")
return str(val) if val else None
def _extract_fpt(data: Dict[str, Any]) -> Optional[str]:
val = data.get("firstPublishTime")
return str(val) if val else None
def _extract_company_name(data: Dict[str, Any]) -> Optional[str]:
name = data.get("companyName") or data.get("name")
return str(name) if name else None
def _build_zhilian_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
skill_labels = data.get("skillLabel") or []
skill_values = [tag["value"] for tag in skill_labels if isinstance(tag, dict) and "value" in tag]
return {
"source_type": "智联招聘",
"name": safe_get(data, "companyName"),
"common_name": safe_get(data, "companyName"),
"title": safe_get(data, "name"),
"title_addr": safe_get(data, "name"),
"description": safe_get(data, "jobSummary"),
"education": safe_get(data, "education"),
"skill": safe_join(skill_values),
"welfare": "",
"years": safe_get(data, "workingExp"),
"salary": safe_get(data, "salary60"),
"location": f"{safe_get(data, 'workCity')}{safe_get(data, 'cityDistrict')}",
"position": f"{safe_get(data, 'workCity')}{safe_get(data, 'cityDistrict')}",
"job_type": safe_get(data, "workType"),
"size": safe_get(data, "companySize"),
"employer_type": safe_get(data, "propertyName"),
"industry": safe_get(data, "industryName"),
"job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "",
"date": safe_get(data, "firstPublishTime"),
"start_date": "", "end_date": "",
"age": "", "sex": "",
"number": str(safe_get(data, "recruitNumber")),
"url": safe_get(data, "positionURL"),
"company_id": str(safe_get(data, "companyId")),
"company_name": safe_get(data, "companyName"),
"company_url": safe_get(data, "companyUrl"),
"company_desc": safe_get(data, "companyDesc"),
"base_data": data,
}
register(PlatformConfig(
platform="zhilian", channel="mini", data_type="job",
table="zhilian_job",
dedup_fields=(
DedupFieldSpec(column="number", extractor=_extract_number),
DedupFieldSpec(column="first_publish_time", extractor=_extract_fpt),
),
push_mapper=_build_zhilian_push,
))
register(PlatformConfig(
platform="zhilian", channel="mini", data_type="company",
table="zhilian_company",
dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),),
))
# 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分)
register(PlatformConfig(
platform="zhilian", channel="company", data_type="job",
table="zhilian_job",
dedup_fields=(
DedupFieldSpec(column="number", extractor=_extract_number),
DedupFieldSpec(column="first_publish_time", extractor=_extract_fpt),
),
))

View File

@ -0,0 +1,88 @@
import json
from datetime import datetime
from typing import Dict, Any, List, Tuple
from clickhouse_connect.driver import AsyncClient
from app.log import logger
from app.services.ingest.registry import PlatformConfig
def build_insert_row(
config: PlatformConfig, data: Dict[str, Any], channel: str,
) -> Tuple[List[str], List[Any]]:
"""构建 ClickHouse 插入行(含 channel 列)"""
now = datetime.now()
columns = ["id", "json_data", "channel", "created_at", "updated_at"]
values: List[Any] = [0, json.dumps(data, ensure_ascii=False), channel, now, now]
for spec in config.dedup_fields:
extracted = spec.extractor(data)
columns.append(spec.column)
values.append(str(extracted) if extracted else "")
return columns, values
async def batch_dedup_filter(
client: AsyncClient,
config: PlatformConfig,
rows: List[Dict[str, Any]],
all_columns: List[str],
all_values: List[List[Any]],
) -> Tuple[List[List[Any]], int]:
"""批量去重过滤,返回 (过滤后的 values 列表, 被忽略数量)"""
dedup_cols = config.dedup_columns
if not dedup_cols or not all_values:
return all_values, 0
table = f"job_data.{config.table}"
# 建立 column name -> index 映射
col_idx = {name: i for i, name in enumerate(all_columns)}
if len(dedup_cols) == 1:
key_col = dedup_cols[0]
idx = col_idx[key_col]
candidate_keys = list({str(row[idx]) for row in all_values if row[idx]})
if not candidate_keys:
return all_values, 0
query = (
f"SELECT {key_col} FROM {table} "
f"WHERE {key_col} IN {{keys:Array(String)}} "
f"AND created_at > now() - INTERVAL 30 DAY"
)
existing = await client.query(query, parameters={"keys": candidate_keys})
existing_set = {str(r[0]) for r in existing.result_rows}
filtered = [row for row in all_values if str(row[idx]) not in existing_set]
return filtered, len(all_values) - len(filtered)
if len(dedup_cols) == 2:
c1, c2 = dedup_cols
idx1, idx2 = col_idx[c1], col_idx[c2]
candidate_c1 = list({str(row[idx1]) for row in all_values if row[idx1]})
if not candidate_c1:
return all_values, 0
query = (
f"SELECT {c1}, {c2} FROM {table} "
f"WHERE {c1} IN {{keys:Array(String)}} "
f"AND created_at > now() - INTERVAL 30 DAY"
)
existing = await client.query(query, parameters={"keys": candidate_c1})
existing_map: Dict[str, set] = {}
for r in existing.result_rows:
existing_map.setdefault(str(r[0]), set()).add(str(r[1]))
filtered = [
row for row in all_values
if str(row[idx1]) not in existing_map
or str(row[idx2]) not in existing_map.get(str(row[idx1]), set())
]
return filtered, len(all_values) - len(filtered)
# 不支持 3+ 列去重,直接返回
logger.warning(f"不支持 {len(dedup_cols)} 列去重,跳过过滤")
return all_values, 0

0
tests/ingest/__init__.py Normal file
View File

155
tests/ingest/test_dedup.py Normal file
View File

@ -0,0 +1,155 @@
"""
dedup.py 测试 DATA-01 30天窗口去重验证
使用 asyncio.run() 运行异步函数避免依赖 pytest-asyncio
"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.services.ingest.dedup import batch_dedup_filter, build_insert_row
from app.services.ingest.registry import PlatformConfig, DedupFieldSpec
# ─── 辅助工厂函数 ─────────────────────────────────────
def _make_config_single(table: str = "boss_job") -> PlatformConfig:
return PlatformConfig(
platform="boss", channel="mini", data_type="job",
table=table,
dedup_fields=(DedupFieldSpec(column="job_id", extractor=lambda d: d.get("jobId")),),
)
def _make_config_double(table: str = "qcwy_job") -> PlatformConfig:
return PlatformConfig(
platform="qcwy", channel="mini", data_type="job",
table=table,
dedup_fields=(
DedupFieldSpec(column="job_id", extractor=lambda d: d.get("jobId")),
DedupFieldSpec(column="update_date_time", extractor=lambda d: d.get("updateDt")),
),
)
def _make_mock_client(rows: list) -> AsyncMock:
mock_result = MagicMock()
mock_result.result_rows = rows
mock_client = AsyncMock()
mock_client.query.return_value = mock_result
return mock_client
def _make_rows(job_ids: list[str]) -> tuple[list[str], list[list]]:
columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id"]
values = [[0, "{}", "mini", None, None, jid] for jid in job_ids]
return columns, values
def _make_double_rows(pairs: list[tuple[str, str]]) -> tuple[list[str], list[list]]:
columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id", "update_date_time"]
values = [[0, "{}", "mini", None, None, jid, udt] for jid, udt in pairs]
return columns, values
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
# ─── 测试:单字段去重 ─────────────────────────────────
def test_single_field_dedup_within_30_days():
"""30 天内有相同 job_id → 视为重复,过滤"""
mock_client = _make_mock_client([("JOB001",)]) # 已存在
config = _make_config_single()
columns, values = _make_rows(["JOB001", "JOB002"])
rows = [{"jobId": "JOB001"}, {"jobId": "JOB002"}]
filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values))
assert ignored == 1
assert len(filtered) == 1
assert filtered[0][5] == "JOB002"
# 验证 SQL 包含 30 天窗口
sql = mock_client.query.call_args[0][0]
assert "INTERVAL 30 DAY" in sql, f"SQL 应包含 INTERVAL 30 DAY实际: {sql}"
def test_single_field_dedup_no_existing():
"""无现有记录 → 允许全部入库"""
mock_client = _make_mock_client([])
config = _make_config_single()
columns, values = _make_rows(["JOB_NEW_1", "JOB_NEW_2"])
rows = [{"jobId": "JOB_NEW_1"}, {"jobId": "JOB_NEW_2"}]
filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values))
assert ignored == 0
assert len(filtered) == 2
def test_single_field_dedup_sql_has_30_day_window():
"""验证单字段 dedup SQL 中包含 30 天时间窗口"""
mock_client = _make_mock_client([])
config = _make_config_single()
columns, values = _make_rows(["JOB_X"])
rows = [{"jobId": "JOB_X"}]
_run(batch_dedup_filter(mock_client, config, rows, columns, values))
assert mock_client.query.called
sql = mock_client.query.call_args[0][0]
assert "INTERVAL 30 DAY" in sql
# ─── 测试:双字段去重 ─────────────────────────────────
def test_double_field_dedup_within_30_days():
"""30 天内双字段都匹配 → 视为重复,过滤"""
mock_client = _make_mock_client([("JOB001", "2026-03-01")]) # 已存在
config = _make_config_double()
columns, values = _make_double_rows([("JOB001", "2026-03-01"), ("JOB002", "2026-03-10")])
rows = [{"jobId": "JOB001", "updateDt": "2026-03-01"}, {"jobId": "JOB002", "updateDt": "2026-03-10"}]
filtered, ignored = _run(batch_dedup_filter(mock_client, config, rows, columns, values))
assert ignored == 1
assert len(filtered) == 1
sql = mock_client.query.call_args[0][0]
assert "INTERVAL 30 DAY" in sql
def test_dedup_empty_input():
"""空 all_values → 直接返回,不调用 ClickHouse"""
mock_client = AsyncMock()
config = _make_config_single()
columns = ["id", "json_data", "channel", "created_at", "updated_at", "job_id"]
filtered, ignored = _run(batch_dedup_filter(mock_client, config, [], columns, []))
assert ignored == 0
assert filtered == []
mock_client.query.assert_not_called()
# ─── 测试build_insert_row ────────────────────────────
def test_build_insert_row_has_channel():
"""build_insert_row 生成的列中包含 channel 和 job_id"""
config = _make_config_single()
data = {"jobId": "JOB999"}
columns, values = build_insert_row(config, data, "mini")
assert "channel" in columns
channel_idx = columns.index("channel")
assert values[channel_idx] == "mini"
assert "job_id" in columns
job_id_idx = columns.index("job_id")
assert values[job_id_idx] == "JOB999"