feat(02-01): migrate Boss spider layer from spiderJobs.core to crawler_core

- client.py: inherit crawler_core.http_client.HTTPClient, use crawler_core.boss.sign.BossSign
- api.py: use crawler_core.base.Result/BaseFetcher/BaseSearcher, fix self._http -> self.http_client
- main.py: import BaseFetcher/BaseSearcher and BossSign from crawler_core
- sign.py: replace with backward-compat stub re-exporting BossSign from crawler_core
Satisfies ARCH-03
This commit is contained in:
win 2026-03-21 19:00:30 +08:00
parent b20f77fa19
commit 46883cef8a
4 changed files with 598 additions and 0 deletions

View File

@ -0,0 +1,340 @@
"""
Boss直聘 - 所有 API 接口
每个类只负责参数构建HTTP 和算法由 client / core 层处理
响应格式适配:
Boss 使用 code/zpData区别于智联的 statusCode/data
code=0 表示成功zpData 为实际业务数据
"""
from __future__ import annotations
from typing import Any, Optional
from urllib.parse import urlencode
from crawler_core.base import BaseFetcher, BaseSearcher, Result
from spiderJobs.platforms.boss.client import BossClient, create_client
# ─────────────────────────────────────────────
# Boss 响应解析(覆写默认算法)
# ─────────────────────────────────────────────
def _parse_boss_response(http_code: int, raw: Any) -> Result:
"""
Boss 专用响应解析
Boss 响应格式:
{"code": 0, "message": "Success", "zpData": {...}}
code=0 成功其他为业务错误
"""
if http_code != 200:
return Result(
success=False,
status_code=http_code,
error=f"HTTP 请求失败: {http_code}",
)
if not isinstance(raw, dict):
return Result(success=False, status_code=http_code, error="响应格式异常")
biz_code = raw.get("code", -1)
if biz_code != 0:
return Result(
success=False,
status_code=biz_code,
error=raw.get("message") or f"业务错误: {biz_code}",
)
payload = raw.get("zpData") or {}
# 列表型响应
if isinstance(payload, dict) and "jobList" in payload:
job_list = payload.get("jobList", [])
has_more = payload.get("hasMore", False)
return Result(
success=True, status_code=200, data=payload,
list=job_list,
count=len(job_list),
is_end_page=not has_more,
)
# 列表型响应(公司职位列表使用 list 字段)
if isinstance(payload, dict) and "list" in payload:
items = payload.get("list", [])
has_more = payload.get("hasMore", False)
return Result(
success=True, status_code=200, data=payload,
list=items,
count=len(items),
is_end_page=not has_more,
)
return Result(success=True, status_code=200, data=payload)
# ─────────────────────────────────────────────
# 1. 首页推荐职位列表GET
# ─────────────────────────────────────────────
class SearchRecJobs(BaseSearcher):
"""
首页推荐/搜索职位列表无需登录
api = SearchRecJobs(city_code="101280600")
result = api.search()
all_jobs = api.load_all(max_pages=5)
"""
ENDPOINT = "/wapi/zpgeek/miniapp/homepage/recjoblist.json"
def __init__(
self,
*,
city_code: str = "101280600",
sort_type: int = 1,
district_code: str = "",
blue_welfare: str = "",
encrypt_expect_id: str = "",
page_size: int = 15,
client: Optional[BossClient] = None,
):
super().__init__(page_size=page_size, http_client=client or create_client())
self.city_code = city_code
self.sort_type = sort_type
self.district_code = district_code
self.blue_welfare = blue_welfare
self.encrypt_expect_id = encrypt_expect_id
def _build_params(self, page_index: int) -> dict:
return {
"cityCode": self.city_code,
"sortType": self.sort_type,
"page": page_index,
"pageSize": self.page_size,
"encryptExpectId": self.encrypt_expect_id,
"districtCode": self.district_code,
"blueWelfare": self.blue_welfare,
"appId": 10002,
}
def _request(self, params: dict) -> tuple[int, Any]:
"""覆写为 GET 请求"""
return self.http_client.get(self.ENDPOINT, params)
def _parse(self, http_code: int, raw: Any) -> Result:
return _parse_boss_response(http_code, raw)
# ─────────────────────────────────────────────
# 2. 职位详情(通过 batch 接口)
# ─────────────────────────────────────────────
class GetJobDetail(BaseFetcher):
"""
职位详情无需登录通过 /wapi/batch/requests 批量请求
detail = GetJobDetail(
security_id="xxx",
job_id="92ea3c76f9197a1503Vz09q8EFRR",
lid="8uF4BIOMvBU.search.63",
).fetch()
"""
ENDPOINT = "/wapi/batch/requests"
def __init__(
self,
*,
security_id: str,
job_id: str,
lid: str = "",
source: int = 10,
client: Optional[BossClient] = None,
):
super().__init__(http_client=client or create_client())
self.security_id = security_id
self.job_id = job_id
self.lid = lid
self.source = source
def _build_params(self) -> dict:
"""不使用batch 请求由 fetch 直接处理)"""
return {}
def fetch(self) -> Result:
"""覆写 fetch使用 batch 接口"""
detail_query = urlencode({
"securityId": self.security_id,
"jobId": self.job_id,
"lid": self.lid,
"source": self.source,
})
improvement_query = urlencode({
"securityId": self.security_id,
"jobId": self.job_id,
"lid": self.lid,
})
sub_reqs = [
{
"path": "/wapi/zpgeek/miniapp/job/detail.json",
"method": "GET",
"query": detail_query,
},
{
"path": "/wapi/zpgeek/miniapp/jobdetail/improvement/query.json",
"method": "GET",
"query": improvement_query,
},
]
try:
client: BossClient = self.http_client
http_code, data = client.batch(sub_reqs)
except Exception as e:
return Result(success=False, status_code=-1, error=str(e))
return self._parse(http_code, data)
def _parse(self, http_code: int, raw: Any) -> Result:
"""解析 batch 响应,合并子请求结果"""
if http_code != 200:
return Result(success=False, status_code=http_code, error=f"HTTP 请求失败: {http_code}")
if not isinstance(raw, dict):
return Result(success=False, status_code=http_code, error="响应格式异常")
biz_code = raw.get("code", -1)
if biz_code != 0:
return Result(
success=False,
status_code=biz_code,
error=raw.get("message") or f"业务错误: {biz_code}",
)
zp_data = raw.get("zpData") or {}
# 合并两个子请求的数据
detail = zp_data.get("/wapi/zpgeek/miniapp/job/detail.json", {})
improvement = zp_data.get("/wapi/zpgeek/miniapp/jobdetail/improvement/query.json", {})
merged = {
"detail": detail.get("zpData") if isinstance(detail, dict) else detail,
"improvement": improvement.get("zpData") if isinstance(improvement, dict) else improvement,
}
return Result(success=True, status_code=200, data=merged)
# ─────────────────────────────────────────────
# 3. 公司/品牌详情GET
# ─────────────────────────────────────────────
class GetBrandDetail(BaseFetcher):
"""
公司/品牌详情无需登录
detail = GetBrandDetail(brand_id="02cd05cce753437e33V50w~~").fetch()
"""
ENDPOINT = "/wapi/zpgeek/miniapp/brand/detail.json"
def __init__(self, *, brand_id: str, client: Optional[BossClient] = None):
super().__init__(http_client=client or create_client())
self.brand_id = brand_id
def _build_params(self) -> dict:
return {"brandId": self.brand_id, "appId": 10002}
def _parse(self, http_code: int, raw: Any) -> Result:
return _parse_boss_response(http_code, raw)
# ─────────────────────────────────────────────
# 4. 公司职位列表GET
# ─────────────────────────────────────────────
class SearchBrandJobs(BaseSearcher):
"""
公司在招职位列表无需登录
api = SearchBrandJobs(brand_id="02cd05cce753437e33V50w~~")
result = api.search()
all_jobs = api.load_all(max_pages=3)
"""
ENDPOINT = "/wapi/zpgeek/miniapp/brand/joblist.json"
def __init__(
self,
*,
brand_id: str,
query: str = "",
position_lv1: int = 0,
city: str = "",
experience: str = "",
salary: str = "",
page_size: int = 15,
client: Optional[BossClient] = None,
):
super().__init__(page_size=page_size, http_client=client or create_client())
self.brand_id = brand_id
self.query = query
self.position_lv1 = position_lv1
self.city = city
self.experience = experience
self.salary = salary
def _build_params(self, page_index: int) -> dict:
return {
"brandId": self.brand_id,
"query": self.query,
"page": page_index,
"hasMore": "true",
"positionLv1": self.position_lv1,
"city": self.city,
"experience": self.experience,
"salary": self.salary,
"appId": 10002,
}
def _request(self, params: dict) -> tuple[int, Any]:
"""覆写为 GET 请求"""
return self.http_client.get(self.ENDPOINT, params)
def _parse(self, http_code: int, raw: Any) -> Result:
return _parse_boss_response(http_code, raw)
# ─────────────────────────────────────────────
# 使用示例
# ─────────────────────────────────────────────
if __name__ == "__main__":
import json
print("=== 1. 首页推荐职位 ===")
r = SearchRecJobs(city_code="101280600").search()
print(f"成功: {r.success}, 本页 {len(r.list)} 条, is_end_page: {r.is_end_page}")
if r.list:
print(f"第一条: {json.dumps(r.list[0], ensure_ascii=False, indent=2)[:200]}...")
print("\n=== 2. 公司详情 ===")
r = GetBrandDetail(brand_id="02cd05cce753437e33V50w~~").fetch()
print(f"成功: {r.success}")
if r.data:
print(f"数据: {json.dumps(r.data, ensure_ascii=False, indent=2)[:300]}...")
print("\n=== 3. 公司职位列表 ===")
r = SearchBrandJobs(brand_id="02cd05cce753437e33V50w~~").search()
print(f"成功: {r.success}, 本页 {len(r.list)}")
# 注: 职位详情需要 security_id需要先从搜索结果中获取
print("\n=== 4. 职位详情(需要 security_id===")
if SearchRecJobs(city_code="101280600").search().list:
first_job = SearchRecJobs(city_code="101280600").search().list[0]
sid = first_job.get("securityId", "")
jid = first_job.get("encryptJobId", "")
if sid and jid:
r = GetJobDetail(security_id=sid, job_id=jid).fetch()
print(f"成功: {r.success}")
if r.data:
print(f"数据: {json.dumps(r.data, ensure_ascii=False, indent=2)[:300]}...")
else:
print("搜索结果中未找到 securityId/encryptJobId 字段")
else:
print("搜索结果为空,跳过")

View File

@ -0,0 +1,122 @@
"""
Boss直聘 HTTP 客户端
在通用 HTTPClient 上叠加 Boss 特有的 headers Traceid 注入
"""
from __future__ import annotations
from typing import Any, Optional
from crawler_core.http_client import HTTPClient
from crawler_core.boss.sign import BossSign
BASE_URL = "https://www.zhipin.com"
# Boss 小程序特有的默认请求头
BOSS_HEADERS = {
"content-type": "application/x-www-form-urlencoded",
"user-agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 "
"MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI "
"MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.7(0x13080712) "
"UnifiedPCMacWechat(0xf2641702) XWEB/18788"
),
"x-requested-with": "XMLHttpRequest",
"xweb_xhr": "1",
"zp_app_id": "10002",
"zp_product_id": "10002",
"ver": "14.0400",
"mini_ver": "14.0400",
"platform": "zhipin/mac",
"ua": '{"model":"Mac16,8","platform":"mac"}',
"scene": "1256",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://servicewechat.com/wxa8da525af05281f3/601/page-frame.html",
"accept-language": "zh-CN,zh;q=0.9",
}
class BossClient(HTTPClient):
"""
Boss直聘 HTTP 客户端
继承通用 HTTPClient每次请求自动注入 Traceid
Args:
signer: BossSign 实例可选
tunnel_proxy: 隧道代理地址每次请求自动换 IP
proxy: 固定代理地址
proxy_pool: 代理池列表
timeout: 请求超时秒数
"""
def __init__(
self,
signer: Optional[BossSign] = None,
tunnel_proxy: Optional[str] = None,
proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
timeout: int = 10,
):
super().__init__(
base_url=BASE_URL,
default_headers=BOSS_HEADERS,
tunnel_proxy=tunnel_proxy,
proxy=proxy,
proxy_pool=proxy_pool,
timeout=timeout,
)
self.signer = signer or BossSign()
def _boss_headers(self) -> dict:
"""构造每次请求需要动态更新的 Boss 请求头"""
return {
"mpt": self.signer.mpt,
"wt2": self.signer.wt2,
"Traceid": BossSign.generate_traceid("M-W"),
}
def post(self, path: str, body: dict, headers: Optional[dict] = None) -> tuple[int, Any]:
"""POST 请求,自动注入 Boss headers"""
boss_h = self._boss_headers()
if headers:
boss_h.update(headers)
return super().post(path, body, boss_h)
def get(self, path: str, params: Optional[dict] = None, headers: Optional[dict] = None) -> tuple[int, Any]:
"""GET 请求,自动注入 Boss headers"""
boss_h = self._boss_headers()
if headers:
boss_h.update(headers)
return super().get(path, params, boss_h)
def batch(self, sub_reqs: list[dict]) -> tuple[int, Any]:
"""
批量请求 /wapi/batch/requests
Args:
sub_reqs: 子请求列表, 每个元素格式:
{"path": "/wapi/...", "method": "GET", "query": "key=val&..."}
Returns:
(http_code, response_json)
"""
body = {"subReqs": sub_reqs, "appId": 10002}
return self.post(
"/wapi/batch/requests",
body,
headers={"content-type": "application/json"},
)
def create_client(
signer: Optional[BossSign] = None,
tunnel_proxy: Optional[str] = None,
proxy: Optional[str] = None,
proxy_pool: Optional[list[str]] = None,
) -> BossClient:
"""创建 Boss 客户端"""
return BossClient(signer=signer, tunnel_proxy=tunnel_proxy, proxy=proxy, proxy_pool=proxy_pool)

View File

@ -0,0 +1,126 @@
"""
Boss直聘 小程序爬虫入口
功能:
1. 从后端获取关键词优先断点续爬 > 失败重试 > 全新
2. 调用 SearchRecJobs 分页爬取职位列表
3. 每页实时上传数据 + 汇报进度
4. 支持从断点页码恢复
5. 可选搜索 job 时顺带抓取公司详情
启动:
python -m spiderJobs.platforms.boss.main
环境变量:
API_BASE_URL 后端地址 (默认 http://124.222.106.226:9999)
MAX_PAGES 每个关键词最大翻页数 (默认 3)
SLEEP_MIN_SECONDS 最小延迟秒数 (默认 10)
SLEEP_MAX_SECONDS 最大延迟秒数 (默认 20)
BOSS_MPT Boss Token (mpt)
BOSS_WT2 Boss Token (wt2)
INLINE_COMPANY 是否内联抓公司 (默认 1 0 关闭)
"""
from __future__ import annotations
import os
import sys
from typing import Optional
# 确保项目根目录在 sys.path 中
_project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from crawler_core.base import BaseFetcher, BaseSearcher
from spiderJobs.platforms.boss.api import GetBrandDetail, SearchRecJobs
from spiderJobs.platforms.boss.client import BossClient, create_client
from crawler_core.boss.sign import BossSign
from spiderJobs.runner.loop import run_crawl_loop
# Boss 城市代码映射(关键词中的城市名 -> Boss cityCode
CITY_CODE_MAP = {
"全国": "100010000",
"北京": "101010100",
"上海": "101020100",
"广州": "101280100",
"深圳": "101280600",
"杭州": "101210100",
"成都": "101270100",
"南京": "101190100",
"武汉": "101200100",
"西安": "101110100",
"长沙": "101250100",
"重庆": "101040100",
"苏州": "101190400",
"天津": "101030100",
"厦门": "101230200",
"郑州": "101180100",
"合肥": "101220100",
"济南": "101120100",
"青岛": "101120200",
"大连": "101070200",
"东莞": "101281600",
"佛山": "101280800",
"珠海": "101280700",
"无锡": "101190200",
"宁波": "101210400",
}
def create_searcher(keyword: dict, http_client: BossClient) -> BaseSearcher:
"""根据关键词创建 Boss 搜索器"""
city = keyword.get("city", "")
city_code = CITY_CODE_MAP.get(city, "101280600")
return SearchRecJobs(
city_code=city_code,
client=http_client,
)
def extract_company_id(job: dict) -> Optional[str]:
"""从 Boss job dict 中提取公司 ID (brandId)"""
brand_id = job.get("brandId")
return str(brand_id) if brand_id else None
def create_company_fetcher(company_id: str, http_client: BossClient) -> BaseFetcher:
"""创建 Boss 公司详情 fetcher"""
return GetBrandDetail(brand_id=company_id, client=http_client)
def main():
mpt = os.environ.get("BOSS_MPT", "")
wt2 = os.environ.get("BOSS_WT2", "")
client_kwargs = {}
if mpt or wt2:
signer = BossSign(mpt=mpt, wt2=wt2)
client_kwargs["signer"] = signer
tunnel = os.environ.get("PROXY_TUNNEL", "")
if tunnel:
scheme = os.environ.get("PROXY_SCHEME", "http")
username = os.environ.get("PROXY_USERNAME", "")
password = os.environ.get("PROXY_PASSWORD", "")
if username and password:
client_kwargs["tunnel_proxy"] = f"{scheme}://{username}:{password}@{tunnel}"
else:
client_kwargs["tunnel_proxy"] = f"{scheme}://{tunnel}"
run_crawl_loop(
platform="boss",
create_searcher=create_searcher,
create_client_fn=create_client,
max_pages=3,
data_type="job",
client_kwargs=client_kwargs,
extract_company_id=extract_company_id,
create_company_fetcher=create_company_fetcher,
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
"""
向后兼容桩 Boss直聘签名
已迁移至 crawler_core.boss.sign
直接从 crawler_core 重新导出避免下游代码出现 ImportError
"""
from crawler_core.boss.sign import BossSign # noqa: F401
__all__ = ["BossSign"]