JobData/jobs_spider/zhilian/zhilian_single.py

783 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import time
import random
import uuid
import hashlib
from typing import Dict, Any, List, Optional, Tuple
import requests
import os
from loguru import logger
from urllib.parse import quote
import socket
def sleep_random_between() -> float:
try:
min_seconds = float(os.getenv("SLEEP_MIN_SECONDS", "1"))
max_seconds = float(os.getenv("SLEEP_MAX_SECONDS", "10"))
if max_seconds < min_seconds:
max_seconds = min_seconds
wait_time = random.uniform(min_seconds, max_seconds)
except Exception:
wait_time = 1.0
time.sleep(wait_time)
return wait_time
# 固定配置,直接修改以下参数即可运行
CITY_ID = 801
PAGE_SIZE = 15
MAX_PAGES = 15
proxy_config = {
"username": "t13319619426654",
"password": "ln8aj9nl",
"tunnel": "s432.kdltps.com:15818"
}
PROXY = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['tunnel']}"
DEDUP = True
API_BASE_URL = os.getenv('API_BASE_URL', 'http://124.222.106.226:9999')
API_PUBLIC_HOST = os.getenv("API_PUBLIC_HOST")
os.makedirs("logs", exist_ok=True)
logger.add("logs/log_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="30 days", enqueue=True)
def log(*args: Any) -> None:
"""时间戳日志打印
Args:
*args: 任意要打印的内容
Returns:
None
"""
logger.info("{} {}", time.strftime("%Y-%m-%d %H:%M:%S"), " ".join(str(a) for a in args))
class ZhilianAPI:
"""智联招聘API封装
Attributes:
session: 会话对象
proxies: 代理配置
"""
def __init__(self, proxy: Optional[str] = None) -> None:
"""初始化
Args:
proxy: 代理地址字符串
Returns:
None
"""
self.session = requests.Session()
self.proxies = None
if proxy:
self.proxies = {"http": proxy, "https": proxy}
self.session.proxies.update(self.proxies)
def request_json(self, method: str, url: str, headers: Dict[str, str], params: Optional[Dict[str, Any]] = None,
json_body: Optional[Dict[str, Any]] = None, timeout: int = 30, max_retries: int = 3,
delay_range: Tuple[float, float] = (1.0, 3.0)) -> Optional[Dict[str, Any]]:
"""统一请求封装返回JSON
Args:
method: HTTP方法
url: 请求地址
headers: 请求头
params: 查询参数
json_body: JSON请求体
timeout: 超时秒数
max_retries: 最大重试次数
delay_range: 每次请求的随机延迟范围
Returns:
dict|None: JSON响应
"""
for attempt in range(max_retries):
try:
sleep_random_between()
resp = self.session.request(method.upper(), url, headers=headers, params=params, json=json_body,
timeout=timeout)
resp.raise_for_status()
data = resp.json()
logger.info("请求参数 method={} url={} status={} params={} body={} resp_size={}", method.upper(), url, resp.status_code, params or {}, json_body or {}, len(resp.content))
logger.info("原始数据 {}", json.dumps(data, ensure_ascii=False))
return data
except Exception:
if attempt == max_retries - 1:
return None
time.sleep(1.5 * (attempt + 1))
return None
def fetch_company_desc_by_job(self, number: str) -> Optional[str]:
"""通过职位编号获取公司描述
Args:
number: 职位编号
Returns:
str|None: 公司描述HTML
"""
client_id = gen_client_id()
url_pc = "https://fe-api.zhaopin.com/c/i/jobs/position-detail-new"
params_pc = {
"number": number,
"_v": gen_v(),
"x-zp-page-request-id": gen_page_request_id(),
"x-zp-client-id": client_id,
}
headers_pc = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "identity",
"sec-ch-ua-platform": "macOS",
"x-zp-business-system": "1",
"x-zp-page-code": "4019",
"sec-ch-ua": "\"Not/A)Brand\";v=\"8\", \"Chromium\";v=\"136\", \"Google Chrome\";v=\"136\"",
"sec-ch-ua-mobile": "?0",
"x-zp-platform": "13",
"origin": "https://www.zhaopin.com",
"sec-fetch-site": "same-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://www.zhaopin.com/",
"accept-language": "zh-CN,zh;q=0.9",
"priority": "u=1, i",
'Cookie': f"x-zp-client-id={client_id}"
}
data_pc = self.request_json("GET", url_pc, headers_pc, params=params_pc)
if data_pc and isinstance(data_pc, dict):
detail = data_pc.get("data") or {}
comp = detail.get("detailedCompany") or {}
desc_pc = comp.get("companyDescription")
if isinstance(desc_pc, str) and desc_pc:
return desc_pc
ua = _get_user_agent(True)
url_mini = "https://cgate.zhaopin.com/positionbusiness/exposure/companyDetail"
params_mini = {
"number": number,
"platform": "12",
"version": "0.0.0",
}
headers_mini = build_headers_miniapp(ua)
data_mini = self.request_json("GET", url_mini, headers_mini, params=params_mini)
if data_mini and isinstance(data_mini, dict):
desc_mini = ((data_mini.get("data") or {}).get("companyBase") or {}).get("companyDescWithHtml")
if isinstance(desc_mini, str) and desc_mini:
return desc_mini
return None
def crawl_pc(self, city_id: int, page_size: int, max_pages: int, dedup: bool,
job_level3_code: Optional[str] = None) -> List[Dict[str, Any]]:
"""PC接口按城市抓取职位
Args:
city_id: 城市ID
page_size: 每页数量
max_pages: 最大页数
dedup: 是否启用本地去重
job_level3_code: 三级职位代码
Returns:
list: 抓取的职位列表
"""
headers = build_headers_pc()
base_url = "https://fe-api.zhaopin.com/c/i/search/positions"
seen = set()
items: List[Dict[str, Any]] = []
for page in range(1, max_pages + 1):
log("开始抓取PC职位页", {"city_id": city_id, "page": page, "page_size": page_size, "job_level3": job_level3_code or ""})
params = {
"_v": gen_v(),
"x-zp-page-request-id": gen_page_request_id(),
"x-zp-client-id": gen_client_id(),
}
payload = {
"S_SOU_WORK_CITY": city_id,
"order": 4,
"pageSize": page_size,
"pageIndex": page,
"eventScenario": "pcSearchedSouSearch",
"anonymous": 1,
"platform": 13,
"version": "0.0.0",
}
if job_level3_code:
payload["S_SOU_JD_JOB_LEVEL3"] = job_level3_code
data = self.request_json("POST", base_url, headers, params=params, json_body=payload)
if not data or data.get("code") != 200:
log("抓取失败或返回非200", {"page": page, "resp_code": (data or {}).get("code")})
break
lst = data.get("data", {}).get("list", [])
if not lst:
log("该页无职位数据", {"page": page})
break
page_items: List[Dict[str, Any]] = []
for job in lst:
jid = job.get("jobId")
if dedup and jid in seen:
continue
if dedup and jid:
seen.add(jid)
num = job.get("number")
if num:
desc = self.fetch_company_desc_by_job(str(num)) or ""
job["companyDesc"] = desc
print(desc)
items.append(job)
page_items.append(job)
log("该页职位数", {"page": page, "count": len(page_items)})
if page_items:
self.report_data(page_items, "job", "zhilian")
log("PC抓取完成", {"total": len(items)})
return items
def get_local_ip(self) -> str:
"""获取本地IP地址"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except Exception:
return "127.0.0.1"
def report_data(self, data_list: List[Dict[str, Any]], data_type: str, platform: str = "zhilian") -> bool:
"""上报数据到远程API
Args:
data_list: 数据列表
data_type: 数据类型
platform: 平台标识
Returns:
bool: 是否上报成功
"""
try:
universal_data = {
"data_list": data_list,
"data_type": data_type,
"platform": platform
}
headers = {
"accept": "application/json",
"Content-Type": "application/json",
'X-Forwarded-For': self.get_local_ip()
}
if API_PUBLIC_HOST:
headers["Host"] = API_PUBLIC_HOST
headers["X-Forwarded-Host"] = API_PUBLIC_HOST
api_endpoint = f"{API_BASE_URL}/api/v1/universal/data/batch-store-async"
logger.info("REPORT_DATA {}", json.dumps(universal_data, ensure_ascii=False))
resp = requests.post(api_endpoint, json=universal_data, headers=headers, timeout=300)
ok = 200 <= resp.status_code < 300
log("数据上报完成", {"count": len(data_list), "status_code": resp.status_code, "ok": ok})
return ok
except Exception:
return False
def _get_user_agent(mobile: bool = True) -> str:
"""获取随机User-Agent
Args:
mobile: 是否使用移动端UA
Returns:
str: 随机UA字符串
"""
try:
from fake_useragent import UserAgent
ua = UserAgent(platforms=['mobile'] if mobile else None)
return ua.random
except Exception:
if mobile:
return "Mozilla/5.0 (Linux; Android 10; Mobile) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Mobile Safari/537.36"
return "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36"
def generate_xzp_rt() -> str:
"""生成x-zp-rt签名
Returns:
str: rt签名
"""
unique_string = f"{uuid.uuid4()}-{time.time()}"
return hashlib.md5(unique_string.encode("utf-8")).hexdigest()
def random_device_id() -> str:
"""生成随机设备ID
Returns:
str: 设备IDUUID
"""
return str(uuid.uuid4()).upper()
def gen_page_request_id() -> str:
"""生成页面请求ID
Returns:
str: 请求ID
"""
return f"cf1e3b3e655b4eb5a306110a83c77c29-{int(time.time()*1000)}-{random.randint(0,999999)}"
def gen_client_id() -> str:
"""生成客户端ID
Returns:
str: 客户端ID
"""
t = int(time.time() * 1000)
try:
t += int(time.perf_counter() * 1000)
except Exception:
pass
def repl(c: str) -> str:
n = int((t + random.random() * 16) % 16)
if c == 'x':
return hex(n)[2:]
return hex((n & 0x3) | 0x8)[2:]
tpl = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
return ''.join(repl(c) if c in 'xy' else c for c in tpl)
def gen_v() -> float:
"""生成_v参数
Returns:
float: 小于1的随机数
"""
return round(random.random(), 8)
def build_headers_miniapp(user_agent: str) -> Dict[str, str]:
"""构建小程序接口通用请求头
Args:
user_agent: UA字符串
Returns:
dict: 请求头
"""
return {
'User-Agent': user_agent,
'x-zp-page-code': "7020",
'x-zp-rt': generate_xzp_rt(),
'x-zp-device-id': random_device_id(),
'content-type': "application/json",
'x-zp-version': "0.0.0",
'x-zp-business-system': "73",
'x-zp-action-id': "",
'xweb_xhr': "1",
'x-zp-channel': "wxxiaochengxu",
'x-zp-platform': "12",
'sec-fetch-site': "cross-site",
'sec-fetch-mode': "cors",
'sec-fetch-dest': "empty",
'referer': "https://servicewechat.com/wxb7718fb9257e4fd2/529/page-frame.html",
'accept-language': "zh-CN,zh;q=0.9",
}
def build_headers_pc() -> Dict[str, str]:
"""构建PC接口通用请求头
Returns:
dict: 请求头
"""
return {
"accept": "application/json, text/plain, */*",
"accept-language": "zh-CN,zh;q=0.9",
"content-type": "application/json;charset=UTF-8",
"origin": "https://www.zhaopin.com",
"priority": "u=1, i",
"referer": "https://www.zhaopin.com/",
"sec-ch-ua-mobile": "?0",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"x-zp-page-code": "0",
}
def request_json(method: str, url: str, headers: Dict[str, str], params: Optional[Dict[str, Any]] = None,
json_body: Optional[Dict[str, Any]] = None, proxies: Optional[str] = None,
timeout: int = 30, max_retries: int = 3, delay_range: Tuple[float, float] = (1.0, 3.0)) -> Optional[Dict[str, Any]]:
"""统一请求封装返回JSON
Args:
method: HTTP方法
url: 请求地址
headers: 请求头
params: 查询参数
json_body: JSON请求体
proxies: 代理地址字符串,如"http://127.0.0.1:7890"
timeout: 超时秒数
max_retries: 最大重试次数
delay_range: 每次请求的随机延迟范围
Returns:
dict|None: JSON响应
"""
proxy_dict = None
if proxies:
proxy_dict = {"http": proxies, "https": proxies}
try:
logger.info("USE_PROXY_TUNNEL {}", proxies.split("@")[1])
except Exception:
logger.info("USE_PROXY_ENABLED")
for attempt in range(max_retries):
try:
sleep_random_between()
resp = requests.request(method.upper(), url, headers=headers, params=params, json=json_body,
timeout=timeout, proxies=proxy_dict)
resp.raise_for_status()
data = resp.json()
logger.info("请求参数 method={} url={} status={} params={} body={} resp_size={}", method.upper(), url, resp.status_code, params or {}, json_body or {}, len(resp.content))
logger.info("原始数据 {}", json.dumps(data, ensure_ascii=False))
return data
except Exception:
if attempt == max_retries - 1:
return None
time.sleep(1.5 * (attempt + 1))
return None
def fetch_company_desc_by_job(number: str, proxies: Optional[str] = None) -> Optional[str]:
client_id = gen_client_id()
url_pc = "https://fe-api.zhaopin.com/c/i/jobs/position-detail-new"
params_pc = {
"number": number,
"_v": gen_v(),
"x-zp-page-request-id": gen_page_request_id(),
"x-zp-client-id": client_id,
}
headers_pc = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"sec-ch-ua-platform": "macOS",
"x-zp-business-system": "1",
"x-zp-page-code": "4019",
"sec-ch-ua": "\"Not/A)Brand\";v=\"8\", \"Chromium\";v=\"136\", \"Google Chrome\";v=\"136\"",
"sec-ch-ua-mobile": "?0",
"x-zp-platform": "13",
"origin": "https://www.zhaopin.com",
"sec-fetch-site": "same-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://www.zhaopin.com/",
"accept-language": "zh-CN,zh;q=0.9",
"priority": "u=1, i",
"Cookie": f"x-zp-client-id={client_id}"
}
data_pc = request_json("GET", url_pc, headers_pc, params=params_pc, proxies=proxies)
if data_pc and isinstance(data_pc, dict):
detail = data_pc.get("data") or {}
comp = detail.get("detailedCompany") or {}
desc_pc = comp.get("companyDescription")
if isinstance(desc_pc, str) and desc_pc:
return desc_pc
ua = _get_user_agent(True)
url_mini = "https://cgate.zhaopin.com/positionbusiness/exposure/companyDetail"
params_mini = {
"number": number,
"platform": "12",
"version": "0.0.0",
}
headers_mini = build_headers_miniapp(ua)
data_mini = request_json("GET", url_mini, headers_mini, params=params_mini, proxies=proxies)
if data_mini and isinstance(data_mini, dict):
desc_mini = ((data_mini.get("data") or {}).get("companyBase") or {}).get("companyDescWithHtml")
if isinstance(desc_mini, str) and desc_mini:
return desc_mini
return None
def load_work_data(path: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""加载work.json数据
Args:
path: 文件路径默认与脚本同目录的work.json
Returns:
dict|None: 解析后的数据
"""
try:
if not path:
path = os.path.join(os.path.dirname(__file__), "work.json")
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return None
def pick_random_city(work: Dict[str, Any]) -> Optional[Tuple[int, str]]:
"""从work.json中随机挑选一个城市ID尽量为PC接口可用的数字
Args:
work: work.json数据
Returns:
(int, str)|None: 城市ID与名称
"""
candidates: List[Tuple[int, str]] = []
data = work.get("data") if isinstance(work, dict) else None
if data:
# 优先寻找明显的城市列表字段
for key in ("cities", "city", "workCity", "subway"):
lst = data.get(key)
if isinstance(lst, list):
for item in lst:
code = item.get("cityId") or item.get("code")
name = item.get("name")
if isinstance(code, int) and isinstance(name, str):
candidates.append((code, name))
elif isinstance(code, str) and code.isdigit() and isinstance(name, str):
candidates.append((int(code), name))
candidates = [(cid, nm) for cid, nm in candidates if 1 <= cid <= 999999 and nm]
if candidates:
return random.choice(candidates)
return None
def pick_random_job_level3(work: Dict[str, Any]) -> Optional[Tuple[str, str]]:
"""从work.json中随机挑选一个三级职位代码S_SOU_JD_JOB_LEVEL3
Args:
work: work.json数据
Returns:
(str, str)|None: 三级职位代码与名称
"""
codes: List[Tuple[str, str]] = []
def walk(obj: Any) -> None:
if isinstance(obj, dict):
val = obj.get("code")
nm = obj.get("name")
if isinstance(val, str):
s = val.strip()
if s and s != "不限" and any(ch.isdigit() for ch in s) and len(s) >= 8 and isinstance(nm, str):
codes.append((s, nm))
for v in obj.values():
walk(v)
elif isinstance(obj, list):
for it in obj:
walk(it)
data = work.get("data") if isinstance(work, dict) else None
if data:
walk(data)
pure = [(c, n) for c, n in codes if c.isdigit()]
if pure:
return random.choice(pure)
if codes:
parts = [(c.split(";")[0], n) for c, n in codes if ";" in c]
if parts:
return random.choice(parts)
return random.choice(codes)
return None
def fetch_service_params() -> Optional[Tuple[int, Optional[str]]]:
"""从服务端获取当天未使用的城市/职位并占用
返回:
(city_id, job_level3_code|None) 或 None
"""
try:
url = f"{API_BASE_URL}/api/v1/keyword/available"
r = requests.get(url, params={"source": "zhilian", "limit": 1}, timeout=10)
if r.status_code != 200:
return None
js = r.json()
data = js.get("data") or {}
items = data.get("items") or []
if not items:
return None
item = items[0]
ids = [item.get("id")]
if ids and ids[0]:
try:
murl = f"{API_BASE_URL}/api/v1/keyword/mark-used"
requests.post(murl, json={"source": "zhilian", "ids": ids}, timeout=10)
except Exception:
pass
city_raw = item.get("city")
job_code = item.get("job")
try:
city_id = int(str(city_raw))
except Exception:
return None
job_code = str(job_code) if job_code else None
return (city_id, job_code)
except Exception:
return None
def crawl_pc(city_id: int, page_size: int, max_pages: int, proxies: Optional[str], dedup: bool, job_level3_code: Optional[str] = None) -> None:
"""PC接口按城市抓取职位
Args:
city_id: 城市ID
page_size: 每页数量
max_pages: 最大页数
proxies: 代理地址
output: 输出文件路径JSONL为空则打印
dedup: 是否启用本地去重
"""
headers = build_headers_pc()
base_url = "https://fe-api.zhaopin.com/c/i/search/positions"
seen = set()
items = []
for page in range(1, max_pages + 1):
log("开始抓取PC职位页", {"city_id": city_id, "page": page, "page_size": page_size, "job_level3": job_level3_code or ""})
params = {
"_v": gen_v(),
"x-zp-page-request-id": gen_page_request_id(),
"x-zp-client-id": gen_client_id(),
}
payload = {
"S_SOU_WORK_CITY": city_id,
"order": 4,
"pageSize": page_size,
"pageIndex": page,
"eventScenario": "pcSearchedSouSearch",
"anonymous": 1,
"platform": 13,
"version": "0.0.0",
}
if job_level3_code:
payload["S_SOU_JD_JOB_LEVEL3"] = job_level3_code
data = request_json("POST", base_url, headers, params=params, json_body=payload, proxies=proxies)
if not data or data.get("code") != 200:
log("抓取失败或返回非200", {"page": page, "resp_code": (data or {}).get("code")})
break
lst = data.get("data", {}).get("list", [])
print(lst)
if not lst:
log("该页无职位数据", {"page": page})
break
page_items = []
for job in lst:
jid = job.get("jobId")
if dedup and jid in seen:
continue
if dedup and jid:
seen.add(jid)
# 在这里加一个 公司获取的 描述
num = job.get("jobId") or job.get("number")
if num:
desc = fetch_company_desc_by_job(str(num), proxies) or ""
job["companyDesc"] = desc
items.append(job)
page_items.append(job)
log("该页职位数", {"page": page, "count": len(page_items)})
if page_items:
report_data(page_items, "job", "zhilian")
log("PC抓取完成", {"total": len(items)})
def report_data(data_list: List[Dict[str, Any]], data_type: str, platform: str = "zhilian") -> bool:
"""上报数据到远程API
Args:
data_list: 数据列表
data_type: 数据类型
platform: 平台标识
Returns:
bool: 是否上报成功
"""
try:
universal_data = {
"data_list": data_list,
"data_type": data_type,
"platform": platform
}
headers = {
"accept": "application/json",
"Content-Type": "application/json",
'X-Forwarded-For': get_local_ip()
}
if API_PUBLIC_HOST:
headers["Host"] = API_PUBLIC_HOST
headers["X-Forwarded-Host"] = API_PUBLIC_HOST
api_endpoint = f"{API_BASE_URL}/api/v1/universal/data/batch-store-async"
resp = requests.post(api_endpoint, json=universal_data, headers=headers, timeout=300)
ok = 200 <= resp.status_code < 300
log("数据上报完成", {"count": len(data_list), "status_code": resp.status_code, "ok": ok})
return ok
except Exception:
return False
def main() -> None:
"""脚本入口
执行不同模式的抓取流程并输出结果
Returns:
None
"""
work = load_work_data()
api = ZhilianAPI(PROXY)
while True:
svc = fetch_service_params()
if svc:
city_id, job_code = svc
city_name = None
job_name = None
else:
city_id = CITY_ID
city_name = None
if work:
rnd_city = pick_random_city(work)
if isinstance(rnd_city, tuple):
city_id, city_name = rnd_city
job_code = None
job_name = None
if work:
rnd_job = pick_random_job_level3(work)
if isinstance(rnd_job, tuple):
job_code, job_name = rnd_job
log("开始一轮抓取", {"city_id": city_id, "city_name": city_name or "", "job_code": job_code or "", "job_name": job_name or ""})
try:
api.crawl_pc(city_id, PAGE_SIZE, MAX_PAGES, DEDUP, job_code)
except Exception:
pass
sleep_random_between()
if __name__ == "__main__":
main()
def get_local_ip() -> str:
"""获取本地IP地址
Returns:
str: 本地IP地址失败时返回127.0.0.1
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except Exception:
return "127.0.0.1"