company_cleaning_job → sync_company_jobs → store_batch(channel="company") 之前 channel="company" 的 job 配置没有 push_mapper,导致: - 不会生成 push_data_list → 不调 push_to_remote - 不触发 company_desc 补全 三个平台 channel="company" 配置加上对应的 push_mapper
83 lines
3.1 KiB
Python
83 lines
3.1 KiB
Python
from typing import Dict, Any, Optional
|
|
|
|
from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register
|
|
from app.services.ingest.remote_push import safe_get, safe_join
|
|
|
|
|
|
def _extract_number(data: Dict[str, Any]) -> Optional[str]:
|
|
val = data.get("number")
|
|
return str(val) if val else None
|
|
|
|
|
|
def _extract_fpt(data: Dict[str, Any]) -> Optional[str]:
|
|
val = data.get("firstPublishTime")
|
|
return str(val) if val else None
|
|
|
|
|
|
def _extract_company_name(data: Dict[str, Any]) -> Optional[str]:
|
|
name = data.get("companyName") or data.get("name")
|
|
return str(name) if name else None
|
|
|
|
|
|
def _build_zhilian_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
skill_labels = data.get("skillLabel") or []
|
|
skill_values = [tag["value"] for tag in skill_labels if isinstance(tag, dict) and "value" in tag]
|
|
return {
|
|
"source_type": "智联招聘",
|
|
"name": safe_get(data, "companyName"),
|
|
"common_name": safe_get(data, "companyName"),
|
|
"title": safe_get(data, "name"),
|
|
"title_addr": safe_get(data, "name"),
|
|
"description": safe_get(data, "jobSummary"),
|
|
"education": safe_get(data, "education"),
|
|
"skill": safe_join(skill_values),
|
|
"welfare": "",
|
|
"years": safe_get(data, "workingExp"),
|
|
"salary": safe_get(data, "salary60"),
|
|
"location": f"{safe_get(data, 'workCity')}{safe_get(data, 'cityDistrict')}",
|
|
"position": f"{safe_get(data, 'workCity')}{safe_get(data, 'cityDistrict')}",
|
|
"job_type": safe_get(data, "workType"),
|
|
"size": safe_get(data, "companySize"),
|
|
"employer_type": safe_get(data, "propertyName"),
|
|
"industry": safe_get(data, "industryName"),
|
|
"job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "",
|
|
"date": safe_get(data, "firstPublishTime"),
|
|
"start_date": "", "end_date": "",
|
|
"age": "", "sex": "",
|
|
"number": str(safe_get(data, "recruitNumber")),
|
|
"url": safe_get(data, "positionURL"),
|
|
"company_id": str(safe_get(data, "companyId")),
|
|
"company_name": safe_get(data, "companyName"),
|
|
"company_url": safe_get(data, "companyUrl"),
|
|
"company_desc": safe_get(data, "companyDesc"),
|
|
"base_data": data,
|
|
}
|
|
|
|
|
|
register(PlatformConfig(
|
|
platform="zhilian", channel="mini", data_type="job",
|
|
table="zhilian_job",
|
|
dedup_fields=(
|
|
DedupFieldSpec(column="number", extractor=_extract_number),
|
|
DedupFieldSpec(column="first_publish_time", extractor=_extract_fpt),
|
|
),
|
|
push_mapper=_build_zhilian_push,
|
|
))
|
|
|
|
register(PlatformConfig(
|
|
platform="zhilian", channel="mini", data_type="company",
|
|
table="zhilian_company",
|
|
dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),),
|
|
))
|
|
|
|
# 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分)
|
|
register(PlatformConfig(
|
|
platform="zhilian", channel="company", data_type="job",
|
|
table="zhilian_job",
|
|
dedup_fields=(
|
|
DedupFieldSpec(column="number", extractor=_extract_number),
|
|
DedupFieldSpec(column="first_publish_time", extractor=_extract_fpt),
|
|
),
|
|
push_mapper=_build_zhilian_push,
|
|
))
|