feat(03): migrate job51+zhilian to crawler_core (ARCH-04/05)

job51 (spiderJobs/platforms/job51/):
- client.py: HTTPClient+Job51Sign from crawler_core
- api.py: ApiResult→Result, self._http→self.http_client, _request() POST overrides
- main.py: BaseFetcher/BaseSearcher from crawler_core
- sign.py: backward-compatible stub re-exporting crawler_core.qcwy.sign.Job51Sign

zhilian (spiderJobs/platforms/zhilian/):
- client.py: HTTPClient+ZhilianSign from crawler_core
- api.py: add _parse_zhilian_response (HTTP 200=success), add _parse()/_request()
  to all classes (GET fetchers + POST searcher overrides)
- main.py: BaseFetcher/BaseSearcher from crawler_core
- sign.py: backward-compatible stub re-exporting crawler_core.zhilian.sign.ZhilianSign

tests: 34 new mock tests (17 job51 + 17 zhilian)
Full regression: 98 passed (job51:17 + zhilian:17 + boss:22 + crawler_core:41 + 1)
This commit is contained in:
win 2026-03-21 19:18:22 +08:00
parent 024c2bcd49
commit 8c2c2d29d7
12 changed files with 1518 additions and 0 deletions

View File

@ -0,0 +1,306 @@
"""
前程无忧 (51Job) - 所有 API 接口
每个类只负责参数构建HTTP 和算法由 client / core 层处理
响应格式适配:
51job 使用 status/data 或直接返回数据
status=1 HTTP 200 表示成功
"""
from __future__ import annotations
from typing import Any, Optional
from crawler_core.base import BaseFetcher, BaseSearcher, Result
from spiderJobs.platforms.job51.client import Job51Client, create_client
# ─────────────────────────────────────────────
# 51job 响应解析(覆写默认算法)
# ─────────────────────────────────────────────
def _parse_job51_response(http_code: int, raw: Any) -> Result:
"""
51job 专用响应解析
51job 响应格式cupid 接口:
{"status": 1, "message": "成功", "resultbody": {...}}
status=1 "1" 表示成功resultbody 为实际业务数据
"""
if http_code != 200:
return Result(
success=False,
status_code=http_code,
error=f"HTTP 请求失败: {http_code}",
)
if not isinstance(raw, dict):
return Result(success=False, status_code=http_code, error="响应格式异常")
# 检查业务状态码status 可能是 int 1 或 str "1"
biz_status = raw.get("status")
if biz_status is not None and str(biz_status) != "1":
return Result(
success=False,
status_code=int(biz_status) if str(biz_status).isdigit() else -1,
error=raw.get("message") or f"业务错误: {biz_status}",
)
payload = raw.get("resultbody") or raw.get("data") or {}
# 列表型响应:推荐职位 resultbody.jobList.items[]
if isinstance(payload, dict) and "jobList" in payload:
job_list_wrap = payload.get("jobList", {})
if isinstance(job_list_wrap, dict) and "items" in job_list_wrap:
items = job_list_wrap.get("items", [])
return Result(
success=True, status_code=200, data=payload,
list=items,
count=len(items),
is_end_page=len(items) == 0,
)
# jobList 本身就是列表
if isinstance(job_list_wrap, list):
return Result(
success=True, status_code=200, data=payload,
list=job_list_wrap,
count=len(job_list_wrap),
is_end_page=len(job_list_wrap) == 0,
)
# 列表型响应:公司职位 resultbody.items[]
if isinstance(payload, dict) and "items" in payload:
items = payload.get("items", [])
total = payload.get("totalCount", len(items))
return Result(
success=True, status_code=200, data=payload,
list=items,
count=total,
is_end_page=len(items) == 0,
)
# 列表型响应:通用 list 字段
if isinstance(payload, dict) and "list" in payload:
items = payload.get("list", [])
return Result(
success=True, status_code=200, data=payload,
list=items,
count=len(items),
is_end_page=len(items) == 0,
)
return Result(success=True, status_code=200, data=payload)
# ─────────────────────────────────────────────
# 1. 首页推荐职位搜索POST
# ─────────────────────────────────────────────
class SearchRecommendJobs(BaseSearcher):
"""
首页推荐/搜索职位列表无需登录
api = SearchRecommendJobs(job_area="020000", function_type="A0N7")
result = api.search()
all_jobs = api.load_all(max_pages=5)
"""
ENDPOINT = "open/noauth/recommend/job-tab-dynamic-wx-mini"
def __init__(
self,
*,
job_area: str = "020000",
function_type: str = "",
job_type: str = "recommend",
page_size: int = 10,
client: Optional[Job51Client] = None,
):
super().__init__(page_size=page_size, http_client=client or create_client())
self.job_area = job_area
self.function_type = function_type
self.job_type = job_type
def _build_params(self, page_index: int) -> dict:
body = {
"pageNo": page_index,
"pageSize": self.page_size,
"specialPageCode": True,
"isTouristMode": True,
"type": self.job_type,
"jobArea": self.job_area,
"personAsLabel": "1",
}
if self.function_type:
body["functionType"] = self.function_type
return body
def _request(self, params: dict):
"""51job 推荐搜索使用 POST"""
return self.http_client.post(self.ENDPOINT, params)
def _parse(self, http_code: int, raw: Any) -> Result:
return _parse_job51_response(http_code, raw)
# ─────────────────────────────────────────────
# 2. 职位详情GET
# ─────────────────────────────────────────────
class GetJobDetail(BaseFetcher):
"""
职位详情无需登录
detail = GetJobDetail(job_id="170651439").fetch()
"""
ENDPOINT = "open/noauth/jobs/detail/base"
def __init__(self, *, job_id: str, client: Optional[Job51Client] = None):
super().__init__(http_client=client or create_client())
self.job_id = job_id
def _build_params(self) -> dict:
return {}
def fetch(self) -> Result:
"""覆写 fetch将 job_id 拼入路径"""
endpoint = f"{self.ENDPOINT}/{self.job_id}"
try:
http_code, data = self.http_client.get(endpoint)
except Exception as e:
return Result(success=False, status_code=-1, error=str(e))
return self._parse(http_code, data)
def _parse(self, http_code: int, raw: Any) -> Result:
return _parse_job51_response(http_code, raw)
# ─────────────────────────────────────────────
# 3. 公司详情GET
# ─────────────────────────────────────────────
class GetCompanyInfo(BaseFetcher):
"""
公司详细信息无需登录
detail = GetCompanyInfo(company_id="9825088").fetch()
"""
ENDPOINT = "open/noauth/company-info/info-data"
def __init__(
self,
*,
company_id: str,
color_one: str = "#ffffff",
color_two: str = "#ffffffcc",
client: Optional[Job51Client] = None,
):
super().__init__(http_client=client or create_client())
self.company_id = company_id
self.color_one = color_one
self.color_two = color_two
def _build_params(self) -> dict:
return {
"companyId": self.company_id,
"colorOne": self.color_one,
"colorTwo": self.color_two,
}
def fetch(self) -> Result:
"""覆写 fetch传入 query 参数"""
try:
http_code, data = self.http_client.get(self.ENDPOINT, self._build_params())
except Exception as e:
return Result(success=False, status_code=-1, error=str(e))
return self._parse(http_code, data)
def _parse(self, http_code: int, raw: Any) -> Result:
return _parse_job51_response(http_code, raw)
# ─────────────────────────────────────────────
# 4. 公司职位列表POST
# ─────────────────────────────────────────────
class SearchCompanyJobs(BaseSearcher):
"""
公司招聘职位列表无需登录
api = SearchCompanyJobs(company_id="9825088")
result = api.search()
all_jobs = api.load_all(max_pages=3)
"""
ENDPOINT = "open/noauth/jobs/company"
def __init__(
self,
*,
company_id: str,
job_area: str = "",
function: str = "",
salary_type: str = "",
page_size: int = 10,
client: Optional[Job51Client] = None,
):
super().__init__(page_size=page_size, http_client=client or create_client())
self.company_id = company_id
self.job_area = job_area
self.function = function
self.salary_type = salary_type
def _build_params(self, page_index: int) -> dict:
return {
"pageNum": page_index,
"pageSize": self.page_size,
"coId": self.company_id,
"jobArea": self.job_area,
"function": self.function,
"salaryType": self.salary_type,
"scene": 14,
"requestId": "",
}
def _request(self, params: dict):
"""51job 公司搜索使用 POST"""
return self.http_client.post(self.ENDPOINT, params)
def _parse(self, http_code: int, raw: Any) -> Result:
return _parse_job51_response(http_code, raw)
# ─────────────────────────────────────────────
# 使用示例
# ─────────────────────────────────────────────
if __name__ == "__main__":
import json
print("=== 1. 首页推荐职位 ===")
r = SearchRecommendJobs(job_area="020000").search()
print(f"成功: {r.success}, 本页 {len(r.list)} 条, is_end_page: {r.is_end_page}")
if r.list:
print(f"第一条: {json.dumps(r.list[0], ensure_ascii=False, indent=2)[:300]}...")
print("\n=== 2. 公司详情 ===")
r = GetCompanyInfo(company_id="9825088").fetch()
print(f"成功: {r.success}")
if r.data:
print(f"数据: {json.dumps(r.data, ensure_ascii=False, indent=2)[:300]}...")
print("\n=== 3. 公司职位列表 ===")
r = SearchCompanyJobs(company_id="9825088").search()
print(f"成功: {r.success}, 本页 {len(r.list)}")
# 职位详情:从搜索结果中获取 jobId
print("\n=== 4. 职位详情 ===")
search_r = SearchRecommendJobs(job_area="020000").search()
if search_r.list:
first_job = search_r.list[0]
job_id = str(first_job.get("jobId", "") or first_job.get("id", ""))
if job_id:
r = GetJobDetail(job_id=job_id).fetch()
print(f"成功: {r.success}")
if r.data:
print(f"数据: {json.dumps(r.data, ensure_ascii=False, indent=2)[:300]}...")
else:
print("搜索结果中未找到 jobId 字段")
else:
print("搜索结果为空,跳过")

View File

@ -0,0 +1,169 @@
"""
前程无忧 (51Job) HTTP 客户端
在通用 HTTPClient 上叠加 51job 特有的 sign 签名和默认 headers
Boss/智联不同51job sign 依赖完整的 URL path + body
因此需要在 post/get 方法中先构造签名再拼接最终 URL
"""
from __future__ import annotations
import json
from typing import Any, Optional
from urllib.parse import quote
from crawler_core.http_client import HTTPClient
from crawler_core.qcwy.sign import Job51Sign
BASE_URL = "https://cupid.51job.com"
# 51job 小程序特有的默认请求头
JOB51_HEADERS = {
"user-agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 "
"MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI "
"MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.7(0x13080712) "
"UnifiedPCMacWechat(0xf2641702) XWEB/18788"
),
"xweb_xhr": "1",
"from-domain": "51job_weixin_wxapp",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://servicewechat.com/wx1131e5c71e668b5d/426/page-frame.html",
"accept-language": "zh-CN,zh;q=0.9",
"priority": "u=1, i",
}
class Job51Client(HTTPClient):
"""
前程无忧 HTTP 客户端
继承通用 HTTPClient每次请求自动计算 HMAC-SHA256 签名
Args:
signer: Job51Sign 实例可选
tunnel_proxy: 隧道代理地址每次请求自动换 IP
proxy: 固定代理地址
proxy_pool: 代理池列表
timeout: 请求超时秒数
"""
def __init__(
self,
signer: Optional[Job51Sign] = None,
tunnel_proxy: Optional[str] = None,
proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
timeout: int = 10,
):
super().__init__(
base_url=BASE_URL,
default_headers=JOB51_HEADERS,
tunnel_proxy=tunnel_proxy,
proxy=proxy,
proxy_pool=proxy_pool,
timeout=timeout,
)
self.signer = signer or Job51Sign()
self._uuid = Job51Sign.generate_uuid()
def _job51_headers(self, sign: str) -> dict:
"""构造每次请求的 51job 特有 headers"""
property_obj = {
"frompageUrl": "",
"pageUrl": "pages/index/index",
"isLogin": "",
"accountid": "",
"resumeId": "",
"firstFrompageUrl": "",
"distinct_id": self._uuid,
}
return {
"sign": sign,
"partner": "",
"property": quote(json.dumps(property_obj, ensure_ascii=False, separators=(",", ":")), safe=""),
"uuid": self._uuid,
"user-token": "",
"account-id": "",
}
def post(self, path: str, body: dict, headers: Optional[dict] = None) -> tuple[int, Any]:
"""
POST 请求自动计算签名
注意: path 参数为 endpoint open/noauth/recommend/job-tab-dynamic-wx-mini
签名后会拼为 /endpoint?api_key=51job&timestamp=xxx
关键: body 必须以 compact JSON 发送无空格与签名字符串完全一致
不能使用 requests json= 参数会用默认带空格的序列化
"""
url_path, sign = self.signer.build_sign_path(path, "POST", body=body)
job51_h = self._job51_headers(sign)
job51_h["Content-Type"] = "application/json"
if headers:
job51_h.update(headers)
# 必须用 compact JSON与签名一致通过 _post_raw 发送预序列化 body
raw_body = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
return self._post_raw(url_path, raw_body, job51_h)
def _post_raw(self, path: str, raw_body: str, headers: dict) -> tuple[int, Any]:
"""发送预序列化的 POST 请求data= 而非 json="""
merged_headers = self._merge_headers(headers)
url = f"{self.base_url}{path}"
if self._tunnel_proxy:
import requests_go as requests
s = self._new_session()
try:
resp = s.post(
url,
data=raw_body.encode("utf-8"),
headers=merged_headers,
proxies={"http": self._tunnel_proxy, "https": self._tunnel_proxy},
timeout=self.timeout,
)
return resp.status_code, resp.json()
finally:
s.close()
proxies = self._get_proxies()
kwargs: dict[str, Any] = {
"data": raw_body.encode("utf-8"),
"headers": merged_headers,
"timeout": self.timeout,
}
if proxies:
kwargs["proxies"] = proxies
resp = self._session.post(url, **kwargs)
return resp.status_code, resp.json()
def get(self, path: str, params: Optional[dict] = None, headers: Optional[dict] = None) -> tuple[int, Any]:
"""
GET 请求自动计算签名
注意: params 会被编入签名路径的 query string
"""
url_path, sign = self.signer.build_sign_path(path, "GET", params=params)
job51_h = self._job51_headers(sign)
job51_h["content-type"] = "application/x-www-form-urlencoded"
if headers:
job51_h.update(headers)
# GET 参数已经编入 url_path不再传 params
return super().get(url_path, params=None, headers=job51_h)
def create_client(
signer: Optional[Job51Sign] = None,
tunnel_proxy: Optional[str] = None,
proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
) -> Job51Client:
"""创建 51job 客户端"""
return Job51Client(signer=signer, tunnel_proxy=tunnel_proxy, proxy=proxy, proxy_pool=proxy_pool)

View File

@ -0,0 +1,116 @@
"""
前程无忧 (51Job) 小程序爬虫入口
功能:
1. 从后端获取关键词优先断点续爬 > 失败重试 > 全新
2. 调用 SearchRecommendJobs 分页爬取职位列表
3. 每页实时上传数据 + 汇报进度
4. 支持从断点页码恢复
5. 可选搜索 job 时顺带抓取公司详情
启动:
python -m spiderJobs.platforms.job51.main
环境变量:
API_BASE_URL 后端地址 (默认 http://124.222.106.226:9999)
MAX_PAGES 每个关键词最大翻页数 (默认 3)
SLEEP_MIN_SECONDS 最小延迟秒数 (默认 10)
SLEEP_MAX_SECONDS 最大延迟秒数 (默认 20)
INLINE_COMPANY 是否内联抓公司 (默认 1 0 关闭)
"""
from __future__ import annotations
import os
import sys
from typing import Optional
_project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from crawler_core.base import BaseFetcher, BaseSearcher
from spiderJobs.platforms.job51.api import GetCompanyInfo, SearchRecommendJobs
from spiderJobs.platforms.job51.client import Job51Client, create_client
from spiderJobs.runner.loop import run_crawl_loop
# 51job 城市代码映射
CITY_CODE_MAP = {
"全国": "000000",
"北京": "010000",
"上海": "020000",
"广州": "030200",
"深圳": "040000",
"杭州": "080200",
"成都": "090200",
"南京": "070200",
"武汉": "180200",
"西安": "200200",
"长沙": "190200",
"重庆": "060000",
"苏州": "070300",
"天津": "050000",
"厦门": "110300",
"郑州": "170200",
"合肥": "150200",
"济南": "120200",
"青岛": "120300",
"大连": "230300",
"东莞": "030800",
"佛山": "030600",
"珠海": "030500",
"无锡": "070400",
"宁波": "080300",
}
def create_searcher(keyword: dict, http_client: Job51Client) -> BaseSearcher:
"""根据关键词创建 51job 搜索器"""
city = keyword.get("city", "")
job_area = CITY_CODE_MAP.get(city, "020000")
return SearchRecommendJobs(
job_area=job_area,
client=http_client,
)
def extract_company_id(job: dict) -> Optional[str]:
"""从 51job job dict 中提取公司 ID (coId)"""
co_id = job.get("coId")
return str(co_id) if co_id else None
def create_company_fetcher(company_id: str, http_client: Job51Client) -> BaseFetcher:
"""创建 51job 公司详情 fetcher"""
return GetCompanyInfo(company_id=company_id, client=http_client)
def main():
client_kwargs = {}
tunnel = os.environ.get("PROXY_TUNNEL", "")
if tunnel:
scheme = os.environ.get("PROXY_SCHEME", "http")
username = os.environ.get("PROXY_USERNAME", "")
password = os.environ.get("PROXY_PASSWORD", "")
if username and password:
client_kwargs["tunnel_proxy"] = f"{scheme}://{username}:{password}@{tunnel}"
else:
client_kwargs["tunnel_proxy"] = f"{scheme}://{tunnel}"
run_crawl_loop(
platform="qcwy",
create_searcher=create_searcher,
create_client_fn=create_client,
max_pages=3,
data_type="job",
client_kwargs=client_kwargs,
extract_company_id=extract_company_id,
create_company_fetcher=create_company_fetcher,
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
"""
向后兼容桩 前程无忧 (51Job) 签名
已迁移至 crawler_core.qcwy.sign
直接从 crawler_core 重新导出避免下游代码出现 ImportError
"""
from crawler_core.qcwy.sign import Job51Sign # noqa: F401
__all__ = ["Job51Sign"]

View File

@ -0,0 +1,281 @@
"""
智联招聘 - 所有 API 接口
每个类只负责参数构建HTTP 和算法由 client / core 层处理
"""
from __future__ import annotations
from typing import Any, Optional
from crawler_core.base import BaseFetcher, BaseSearcher, parse_response, Result
# ─────────────────────────────────────────────
# 智联响应解析(覆写默认算法)
# ─────────────────────────────────────────────
def _parse_zhilian_response(http_code: int, raw: Any) -> Result:
"""
智联专用响应解析
智联响应格式cgate / capi 接口:
{"data": {...}} {"data": {"list": [...]}}
HTTP 200 且无 statusCode 字段时视为成功
"""
if http_code != 200:
return Result(success=False, status_code=http_code,
error=f"HTTP 请求失败: {http_code}")
if not isinstance(raw, dict):
return Result(success=False, status_code=http_code, error="响应格式异常")
payload = raw.get("data") or {}
# 列表型响应
if isinstance(payload, dict) and "list" in payload:
items = payload.get("list", [])
num_found = raw.get("pageInfo", {}).get("numFound", 0) or payload.get("numFound", len(items))
return Result(
success=True, status_code=200, data=payload,
list=items,
count=num_found,
is_end_page=len(items) == 0,
)
return Result(success=True, status_code=200, data=payload)
from spiderJobs.platforms.zhilian.client import ZhilianClient, create_cgate_client, create_capi_client
# ─────────────────────────────────────────────
# 1. 职位搜索POST cgate
# ─────────────────────────────────────────────
_SEARCH_BODY = {
"eventScenario": "wxmpZhaopinSearchV2",
"filterMinSalary": 1,
"S_SOU_EXPAND": "SOU_COMPANY_ID",
"sortType": "DEFAULT",
"resumeNumber": "",
"version": "8.11.22",
"identity": 0,
"anonymous": 1,
}
_FILTER_KEYS = [
"S_SOU_SALARY", "S_SOU_EDUCATION_LOWESTLEVEL", "S_SOU_REFRESH_DATE",
"S_SOU_WORK_EXPERIENCE", "S_SOU_POSITION_TYPE", "S_SOU_COMPANY_TYPE",
"S_SOU_COMPANY_SCALE", "welfareLabels", "S_SOU_JD_INDUSTRY_LEVEL",
]
class SearchPositions(BaseSearcher):
"""
职位搜索
api = SearchPositions(keyword="Python", city_code=538)
result = api.search()
all_jobs = api.load_all(max_pages=5)
"""
ENDPOINT = "/positionbusiness/searchrecommend/searchPositions"
def __init__(
self,
*,
keyword: str = "",
city_code: int | str = "",
collected_purpose: Optional[dict] = None,
filters: Optional[dict] = None,
page_size: int = 15,
client: Optional[ZhilianClient] = None,
):
super().__init__(page_size=page_size, http_client=client or create_cgate_client())
self.keyword = keyword
self.city_code = city_code
self.collected_purpose = collected_purpose
self.filters = filters or {}
def _build_params(self, page_index: int) -> dict:
body = {**_SEARCH_BODY, "pageIndex": page_index, "pageSize": self.page_size}
if self.collected_purpose:
body.update(self._purpose_params(self.collected_purpose, page_index))
if self.keyword and "S_SOU_JD_JOB_LEVEL3" not in body:
body["S_SOU_FULL_INDEX"] = self.keyword
if self.city_code and "S_SOU_WORK_CITY" not in body:
body["S_SOU_WORK_CITY"] = self.city_code
body.update({k: self.filters[k] for k in _FILTER_KEYS if self.filters.get(k)})
return body
def _request(self, params: dict):
"""智联职位搜索使用 POST 请求"""
return self.http_client.post(self.ENDPOINT, params)
def _parse(self, http_code: int, raw) -> "Result":
return _parse_zhilian_response(http_code, raw)
@staticmethod
def _purpose_params(purpose: dict, page_index: int) -> dict:
params: dict = {"pageIndex": page_index}
pnew = purpose.get("pnew_preferred_job_type", "")
name = purpose.get("job_type_name", "")
if pnew:
params["S_SOU_JD_JOB_LEVEL3"] = pnew
elif name:
params["S_SOU_FULL_INDEX"] = name
city = purpose.get("city_id", "") or purpose.get("preferred_location", "")
if city:
params["S_SOU_WORK_CITY"] = city
sal_min = purpose.get("preferred_salary_min", "")
sal_max = purpose.get("preferred_salary_max", "")
if sal_min not in ("", "-1") or sal_max != "":
params["S_SOU_SALARY"] = f"{sal_min},{sal_max}"
return params
# ─────────────────────────────────────────────
# 2. 职位详情GET cgate
# ─────────────────────────────────────────────
class GetPositionDetail(BaseFetcher):
"""
职位详情
detail = GetPositionDetail(number="CC462451910J40881838003").fetch()
"""
ENDPOINT = "/positionbusiness/position/getPositionModule"
def __init__(self, *, number: str, identity: int = 0, client: Optional[ZhilianClient] = None):
super().__init__(http_client=client or create_cgate_client())
self.number = number
self.identity = identity
def _build_params(self) -> dict:
return {"number": self.number, "identity": self.identity, "resumeNumber": ""}
def _parse(self, http_code: int, raw) -> "Result":
return _parse_zhilian_response(http_code, raw)
# ─────────────────────────────────────────────
# 3. 企查查工商信息GET cgate
# ─────────────────────────────────────────────
class GetCompanyExtDetail(BaseFetcher):
"""
企查查工商信息
detail = GetCompanyExtDetail(company_name="上海有大信息科技", company_number="CZ462451910").fetch()
"""
ENDPOINT = "/riskstorm/company/getCompanyExtDetail"
def __init__(self, *, company_name: str, company_number: str, client: Optional[ZhilianClient] = None):
super().__init__(http_client=client or create_cgate_client())
self.company_name = company_name
self.company_number = company_number
def _build_params(self) -> dict:
return {"companyName": self.company_name, "companyNumber": self.company_number}
def _parse(self, http_code: int, raw) -> "Result":
return _parse_zhilian_response(http_code, raw)
# ─────────────────────────────────────────────
# 4. 公司详细信息GET cgate
# ─────────────────────────────────────────────
class GetCompanyDetail(BaseFetcher):
"""
公司详细信息
detail = GetCompanyDetail(number="CZ462451910").fetch()
"""
ENDPOINT = "/positionbusiness/exposure/companyDetail"
def __init__(self, *, number: str, client: Optional[ZhilianClient] = None):
super().__init__(http_client=client or create_cgate_client())
self.number = number
def _build_params(self) -> dict:
return {"number": self.number}
def _parse(self, http_code: int, raw) -> "Result":
return _parse_zhilian_response(http_code, raw)
# ─────────────────────────────────────────────
# 5. 公司招聘职位列表GET capi
# ─────────────────────────────────────────────
class SearchCompanyPositions(BaseSearcher):
"""
公司招聘职位列表
api = SearchCompanyPositions(company_id="CZ462451910")
result = api.search()
all_jobs = api.load_all(max_pages=3)
"""
ENDPOINT = "/capi/searchrecommend/searchPositionsCompany"
def __init__(
self,
*,
company_id: str,
job_level: str = "",
city_code: str = "",
page_size: int = 30,
client: Optional[ZhilianClient] = None,
):
self._client = client or create_capi_client()
super().__init__(page_size=page_size, http_client=self._client)
self.company_id = company_id
self.job_level = job_level
self.city_code = city_code
def _build_params(self, page_index: int) -> dict:
params = {**self._client.signer.sign_params()}
params.update({
"S_SOU_COMPANY_ID": self.company_id,
"S_SOU_POSITION_SOURCE_TYPE": "1",
"eventScenario": "wxmpZhaopinSearchPositionsCompany",
"pageCode": "wxmpZhaopinCompanyDetailPage",
"pageIndex": page_index,
"pageSize": self.page_size,
})
if self.job_level:
params["S_SOU_JD_JOB_LEVEL"] = self.job_level
if self.city_code:
params["S_SOU_WORK_CITY"] = self.city_code
return params
def _request(self, params: dict) -> tuple[int, Any]:
return self.http_client.get(self.ENDPOINT, params)
def _parse(self, http_code: int, raw) -> "Result":
return _parse_zhilian_response(http_code, raw)
# ─────────────────────────────────────────────
# 使用示例
# ─────────────────────────────────────────────
if __name__ == "__main__":
import json
print("=== 1. 职位搜索 ===")
r = SearchPositions(keyword="Python", city_code=538).search()
print(f"{r.count} 条,本页 {len(r.list)}")
print("\n=== 2. 职位详情 ===")
r = GetPositionDetail(number="CC462451910J40881838003").fetch()
print(f"成功: {r.success}")
print("\n=== 3. 企查查信息 ===")
r = GetCompanyExtDetail(company_name="上海有大信息科技", company_number="CZ462451910").fetch()
print(f"成功: {r.success}")
print("\n=== 4. 公司详情 ===")
r = GetCompanyDetail(number="CZ462451910").fetch()
print(f"成功: {r.success}")
print("\n=== 5. 公司招聘列表 ===")
r = SearchCompanyPositions(company_id="CZ462451910").search()
print(f"{r.count} 个职位,本页 {len(r.list)}")

View File

@ -0,0 +1,98 @@
"""
智联招聘 HTTP 客户端
在通用 HTTPClient 上叠加智联特有的签名和默认 headers
"""
from __future__ import annotations
from typing import Any, Optional
from crawler_core.http_client import HTTPClient
from crawler_core.zhilian.sign import ZhilianSign
CGATE_BASE_URL = "https://cgate.zhaopin.com"
CAPI_BASE_URL = "https://capi.zhaopin.com"
# 智联特有的默认请求头(不含签名部分)
ZHILIAN_HEADERS = {
"content-type": "application/json",
"user-agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 "
"MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI "
"MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.7(0x13080712) "
"UnifiedPCMacWechat(0xf2641702) XWEB/18788"
),
"accept": "*/*",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://servicewechat.com/wxb7718fb9257e4fd2/647/page-frame.html",
"accept-language": "zh-CN,zh;q=0.9",
"accept-encoding": "identity",
}
class ZhilianClient(HTTPClient):
"""
智联招聘 HTTP 客户端
继承通用 HTTPClient自动注入智联签名
Args:
base_url: API 基础地址默认 cgate
signer: ZhilianSign 签名实例可选
proxy: 固定代理地址
proxy_pool: 代理池列表
timeout: 请求超时秒数
"""
def __init__(
self,
base_url: str = CGATE_BASE_URL,
signer: Optional[ZhilianSign] = None,
proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
timeout: int = 10,
):
super().__init__(
base_url=base_url,
default_headers=ZHILIAN_HEADERS,
proxy=proxy,
proxy_pool=proxy_pool,
timeout=timeout,
)
self.signer = signer or ZhilianSign()
def post(self, path: str, body: dict, headers: Optional[dict] = None, page_code: str = "0") -> tuple[int, Any]:
"""POST 请求,自动注入签名头"""
sign_headers = self.signer.sign_headers(page_code)
if headers:
sign_headers.update(headers)
return super().post(path, body, sign_headers)
def get(self, path: str, params: Optional[dict] = None, headers: Optional[dict] = None, page_code: str = "0") -> tuple[int, Any]:
"""GET 请求,自动注入签名头"""
sign_headers = self.signer.sign_headers(page_code)
if headers:
sign_headers.update(headers)
return super().get(path, params, sign_headers)
def create_cgate_client(
signer: Optional[ZhilianSign] = None,
proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
) -> ZhilianClient:
"""创建 cgate 客户端"""
return ZhilianClient(base_url=CGATE_BASE_URL, signer=signer, proxy=proxy, proxy_pool=proxy_pool)
def create_capi_client(
signer: Optional[ZhilianSign] = None,
proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
) -> ZhilianClient:
"""创建 capi 客户端"""
return ZhilianClient(base_url=CAPI_BASE_URL, signer=signer, proxy=proxy, proxy_pool=proxy_pool)

View File

@ -0,0 +1,112 @@
"""
智联招聘 小程序爬虫入口
功能:
1. 从后端获取关键词优先断点续爬 > 失败重试 > 全新
2. 调用 SearchPositions 分页爬取职位列表
3. 每页实时上传数据 + 汇报进度
4. 支持从断点页码恢复
5. 可选搜索 job 时顺带抓取公司详情
启动:
python -m spiderJobs.platforms.zhilian.main
环境变量:
API_BASE_URL 后端地址 (默认 http://124.222.106.226:9999)
MAX_PAGES 每个关键词最大翻页数 (默认 3)
SLEEP_MIN_SECONDS 最小延迟秒数 (默认 10)
SLEEP_MAX_SECONDS 最大延迟秒数 (默认 20)
INLINE_COMPANY 是否内联抓公司 (默认 1 0 关闭)
"""
from __future__ import annotations
import os
import sys
from typing import Optional
_project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from crawler_core.base import BaseFetcher, BaseSearcher
from spiderJobs.platforms.zhilian.api import GetCompanyDetail, SearchPositions
from spiderJobs.platforms.zhilian.client import ZhilianClient, create_cgate_client
from spiderJobs.runner.loop import run_crawl_loop
# 智联城市代码映射
CITY_CODE_MAP = {
"全国": "",
"北京": 530,
"上海": 538,
"广州": 763,
"深圳": 765,
"杭州": 653,
"成都": 801,
"南京": 635,
"武汉": 736,
"西安": 854,
"长沙": 749,
"重庆": 551,
"苏州": 639,
"天津": 531,
"厦门": 682,
"郑州": 719,
"合肥": 664,
"济南": 703,
"青岛": 704,
"大连": 600,
"东莞": 769,
"佛山": 766,
"珠海": 768,
"无锡": 636,
"宁波": 654,
}
def create_searcher(keyword: dict, http_client: ZhilianClient) -> BaseSearcher:
"""根据关键词创建智联搜索器"""
city = keyword.get("city", "")
job = keyword.get("job", "")
city_code = CITY_CODE_MAP.get(city, 538)
return SearchPositions(
keyword=job,
city_code=city_code,
client=http_client,
)
def extract_company_id(job: dict) -> Optional[str]:
"""从智联 job dict 中提取公司 ID (companyNumber)"""
company_number = job.get("companyNumber") or job.get("company", {}).get("number")
return str(company_number) if company_number else None
def create_company_fetcher(company_id: str, http_client: ZhilianClient) -> BaseFetcher:
"""创建智联公司详情 fetcher"""
return GetCompanyDetail(number=company_id, client=http_client)
def main():
client_kwargs = {}
proxy = os.environ.get("PROXY_URL", "")
if proxy:
client_kwargs["proxy"] = proxy
run_crawl_loop(
platform="zhilian",
create_searcher=create_searcher,
create_client_fn=create_cgate_client,
max_pages=3,
data_type="job",
client_kwargs=client_kwargs,
extract_company_id=extract_company_id,
create_company_fetcher=create_company_fetcher,
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
"""
向后兼容桩 智联招聘签名
已迁移至 crawler_core.zhilian.sign
直接从 crawler_core 重新导出避免下游代码出现 ImportError
"""
from crawler_core.zhilian.sign import ZhilianSign # noqa: F401
__all__ = ["ZhilianSign"]

1
tests/job51/__init__.py Normal file
View File

@ -0,0 +1 @@
# tests/job51/

View File

@ -0,0 +1,216 @@
"""
前程无忧 (51Job) HTTP mock 测试ARCH-04 / QUAL-03
使用 unittest.mock.MagicMock 替代真实 HTTP 客户端无网络依赖
"""
from __future__ import annotations
from unittest.mock import MagicMock
from spiderJobs.platforms.job51.api import (
GetCompanyInfo,
GetJobDetail,
SearchCompanyJobs,
SearchRecommendJobs,
_parse_job51_response,
)
from spiderJobs.platforms.job51.client import Job51Client
from crawler_core.base import Result
# ─────────────────────────────────────────────────────────
# 1. _parse_job51_response 纯函数测试
# ─────────────────────────────────────────────────────────
class TestParseJob51Response:
def test_http_error_returns_failure(self):
result = _parse_job51_response(500, {})
assert result.success is False
assert result.status_code == 500
def test_status_zero_returns_failure(self):
result = _parse_job51_response(200, {"status": 0, "message": "系统繁忙"})
assert result.success is False
assert "系统繁忙" in result.error
def test_status_str_zero_returns_failure(self):
result = _parse_job51_response(200, {"status": "0", "message": "错误"})
assert result.success is False
def test_status_one_with_resultbody_joblist(self):
raw = {
"status": 1,
"resultbody": {
"jobList": {"items": [{"jobId": "123", "jobName": "Python 工程师"}], "totalCount": 1}
}
}
result = _parse_job51_response(200, raw)
assert result.success is True
assert len(result.list) == 1
assert result.list[0]["jobName"] == "Python 工程师"
def test_status_one_no_items_is_end_page(self):
raw = {"status": 1, "resultbody": {"jobList": {"items": []}}}
result = _parse_job51_response(200, raw)
assert result.success is True
assert result.is_end_page is True
def test_non_dict_raw_returns_failure(self):
result = _parse_job51_response(200, "not a dict")
assert result.success is False
def test_detail_payload(self):
raw = {"status": 1, "resultbody": {"companyName": "测试公司"}}
result = _parse_job51_response(200, raw)
assert result.success is True
assert result.data["companyName"] == "测试公司"
# ─────────────────────────────────────────────────────────
# 2. SearchRecommendJobs
# ─────────────────────────────────────────────────────────
class TestSearchRecommendJobs:
def _make_client(self, return_value):
mock_client = MagicMock()
mock_client.post.return_value = return_value
return mock_client
def test_search_success(self):
raw = {
"status": 1,
"resultbody": {
"jobList": {"items": [{"jobId": "1", "jobName": "测试职位"}]}
}
}
searcher = SearchRecommendJobs(job_area="020000",
client=self._make_client((200, raw)))
result = searcher.search(page_index=1)
assert result.success is True
assert len(result.list) == 1
def test_search_http_error(self):
searcher = SearchRecommendJobs(client=self._make_client((403, {})))
result = searcher.search(page_index=1)
assert result.success is False
assert result.status_code == 403
def test_search_biz_error(self):
raw = {"status": 0, "message": "接口限流"}
searcher = SearchRecommendJobs(client=self._make_client((200, raw)))
result = searcher.search(page_index=1)
assert result.success is False
# ─────────────────────────────────────────────────────────
# 3. GetJobDetail路径拼接版
# ─────────────────────────────────────────────────────────
class TestGetJobDetail:
def test_fetch_success(self):
mock_client = MagicMock()
mock_client.get.return_value = (200, {
"status": 1,
"resultbody": {"jobName": "数据工程师", "salary": "20k-30k"},
})
fetcher = GetJobDetail(job_id="170651439", client=mock_client)
result = fetcher.fetch()
assert result.success is True
assert result.data["jobName"] == "数据工程师"
def test_fetch_exception_handled(self):
mock_client = MagicMock()
mock_client.get.side_effect = ConnectionError("网络超时")
fetcher = GetJobDetail(job_id="123", client=mock_client)
result = fetcher.fetch()
assert result.success is False
assert "网络超时" in result.error
def test_fetch_http_error(self):
mock_client = MagicMock()
mock_client.get.return_value = (404, {})
fetcher = GetJobDetail(job_id="nonexist", client=mock_client)
result = fetcher.fetch()
assert result.success is False
assert result.status_code == 404
# ─────────────────────────────────────────────────────────
# 4. GetCompanyInfo
# ─────────────────────────────────────────────────────────
class TestGetCompanyInfo:
def test_fetch_success(self):
mock_client = MagicMock()
mock_client.get.return_value = (200, {
"status": 1,
"resultbody": {"companyName": "测试科技有限公司", "coId": "9825088"},
})
fetcher = GetCompanyInfo(company_id="9825088", client=mock_client)
result = fetcher.fetch()
assert result.success is True
assert result.data["companyName"] == "测试科技有限公司"
def test_fetch_exception(self):
mock_client = MagicMock()
mock_client.get.side_effect = TimeoutError("请求超时")
fetcher = GetCompanyInfo(company_id="123", client=mock_client)
result = fetcher.fetch()
assert result.success is False
# ─────────────────────────────────────────────────────────
# 5. SearchCompanyJobs
# ─────────────────────────────────────────────────────────
class TestSearchCompanyJobs:
def test_search_success(self):
mock_client = MagicMock()
mock_client.post.return_value = (200, {
"status": 1,
"resultbody": {"items": [{"jobId": "1"}], "totalCount": 1},
})
searcher = SearchCompanyJobs(company_id="9825088", client=mock_client)
result = searcher.search(page_index=1)
assert result.success is True
assert len(result.list) == 1
def test_search_empty(self):
mock_client = MagicMock()
mock_client.post.return_value = (200, {
"status": 1,
"resultbody": {"items": [], "totalCount": 0},
})
searcher = SearchCompanyJobs(company_id="9825088", client=mock_client)
result = searcher.search(page_index=1)
assert result.success is True
assert result.is_end_page is True
# ─────────────────────────────────────────────────────────
# 6. Job51Client — sign 注入
# ─────────────────────────────────────────────────────────
class TestJob51ClientHeaders:
def test_headers_contain_sign(self):
client = Job51Client()
headers = client._job51_headers(sign="test_sign_value")
assert headers["sign"] == "test_sign_value"
def test_headers_uuid_format(self):
client = Job51Client()
headers = client._job51_headers(sign="abc")
assert len(headers["uuid"]) >= 20
def test_headers_empty_account(self):
client = Job51Client()
headers = client._job51_headers(sign="xyz")
assert headers["user-token"] == ""
assert headers["account-id"] == ""

View File

@ -0,0 +1 @@
# tests/zhilian/

View File

@ -0,0 +1,198 @@
"""
智联招聘 HTTP mock 测试ARCH-05 / QUAL-03
使用 unittest.mock.MagicMock 替代真实 HTTP 客户端无网络依赖
"""
from __future__ import annotations
from unittest.mock import MagicMock
from crawler_core.zhilian.sign import ZhilianSign
from spiderJobs.platforms.zhilian.api import (
GetCompanyDetail,
GetCompanyExtDetail,
GetPositionDetail,
SearchCompanyPositions,
SearchPositions,
)
from spiderJobs.platforms.zhilian.client import ZhilianClient
from crawler_core.base import Result
# ─────────────────────────────────────────────────────────
# 1. SearchPositionsPOST cgate
# ─────────────────────────────────────────────────────────
class TestSearchPositions:
def _make_client(self, status_code=200, data=None):
mock_client = MagicMock()
mock_client.post.return_value = (status_code, data or {})
return mock_client
def test_search_success_returns_list(self):
data = {
"data": {
"list": [{"number": "CC123", "name": "Python 工程师"}],
"numFound": 1,
},
}
searcher = SearchPositions(
keyword="Python", city_code=538,
client=self._make_client(200, data),
)
result = searcher.search(page_index=1)
assert result.success is True
def test_search_http_403(self):
searcher = SearchPositions(client=self._make_client(403, {}))
result = searcher.search(page_index=1)
assert result.success is False
assert result.status_code == 403
def test_search_http_500(self):
searcher = SearchPositions(client=self._make_client(500, {}))
result = searcher.search(page_index=1)
assert result.success is False
def test_search_builds_keyword_param(self):
mock_client = MagicMock()
mock_client.post.return_value = (200, {"data": {"list": []}})
searcher = SearchPositions(keyword="Java", city_code=530, client=mock_client)
searcher.search(page_index=1)
assert mock_client.post.called
call_kwargs = mock_client.post.call_args
body = call_kwargs[0][1] if len(call_kwargs[0]) > 1 else None
if body:
assert "Java" in str(body)
# ─────────────────────────────────────────────────────────
# 2. GetPositionDetailGET cgate
# ─────────────────────────────────────────────────────────
class TestGetPositionDetail:
def test_fetch_success(self):
mock_client = MagicMock()
mock_client.get.return_value = (200, {
"data": {"number": "CC123", "jobName": "高级工程师"},
})
fetcher = GetPositionDetail(number="CC123", client=mock_client)
result = fetcher.fetch()
assert result.success is True
def test_fetch_404(self):
mock_client = MagicMock()
mock_client.get.return_value = (404, {})
fetcher = GetPositionDetail(number="notexist", client=mock_client)
result = fetcher.fetch()
assert result.success is False
assert result.status_code == 404
# ─────────────────────────────────────────────────────────
# 3. GetCompanyExtDetailGET cgate
# ─────────────────────────────────────────────────────────
class TestGetCompanyExtDetail:
def test_fetch_success(self):
mock_client = MagicMock()
mock_client.get.return_value = (200, {
"data": {"companyName": "智联测试公司"},
})
fetcher = GetCompanyExtDetail(
company_name="智联测试公司",
company_number="CZ123",
client=mock_client,
)
result = fetcher.fetch()
assert result.success is True
# ─────────────────────────────────────────────────────────
# 4. GetCompanyDetailGET cgate
# ─────────────────────────────────────────────────────────
class TestGetCompanyDetail:
def test_fetch_success(self):
mock_client = MagicMock()
mock_client.get.return_value = (200, {
"data": {"companyNumber": "CZ123", "name": "智联公司"},
})
fetcher = GetCompanyDetail(number="CZ123", client=mock_client)
result = fetcher.fetch()
assert result.success is True
def test_fetch_http_error(self):
mock_client = MagicMock()
mock_client.get.return_value = (500, {})
fetcher = GetCompanyDetail(number="CZ123", client=mock_client)
result = fetcher.fetch()
assert result.success is False
# ─────────────────────────────────────────────────────────
# 5. SearchCompanyPositionsGET capi— 验证 sign_params 被调用
# ─────────────────────────────────────────────────────────
class TestSearchCompanyPositions:
def test_search_success_calls_sign_params(self):
mock_signer = MagicMock(spec=ZhilianSign)
mock_signer.sign_params.return_value = {"at": "", "rt": ""}
mock_client = MagicMock()
mock_client.signer = mock_signer
mock_client.get.return_value = (200, {
"data": {"list": [{"jobName": "测试岗位"}]},
"pageInfo": {},
})
searcher = SearchCompanyPositions(company_id="CZ123", client=mock_client)
result = searcher.search(page_index=1)
assert result.success is True
assert mock_signer.sign_params.called # 确认 sign_params 被调用
def test_search_http_error(self):
mock_signer = MagicMock(spec=ZhilianSign)
mock_signer.sign_params.return_value = {}
mock_client = MagicMock()
mock_client.signer = mock_signer
mock_client.get.return_value = (403, {})
searcher = SearchCompanyPositions(company_id="CZ123", client=mock_client)
result = searcher.search(page_index=1)
assert result.success is False
# ─────────────────────────────────────────────────────────
# 6. ZhilianClient — 签名头注入
# ─────────────────────────────────────────────────────────
class TestZhilianClientHeaders:
def test_sign_headers_injects_at_rt(self):
signer = ZhilianSign(at="test_at", rt="test_rt")
client = ZhilianClient(signer=signer)
headers = client.signer.sign_headers()
assert headers["x-zp-at"] == "test_at"
assert headers["x-zp-rt"] == "test_rt"
def test_sign_headers_has_required_keys(self):
client = ZhilianClient()
headers = client.signer.sign_headers()
for key in ["x-zp-at", "x-zp-rt", "x-zp-action-id", "x-zp-device-id"]:
assert key in headers, f"缺少头信息: {key}"
def test_default_signer_empty_tokens(self):
client = ZhilianClient()
headers = client.signer.sign_headers()
assert headers["x-zp-at"] == ""
assert headers["x-zp-rt"] == ""
def test_sign_params_has_required_keys(self):
client = ZhilianClient()
params = client.signer.sign_params()
for key in ["at", "rt", "channel", "platform", "version", "d"]:
assert key in params, f"缺少签名参数: {key}"