13 KiB
13 KiB
爬虫状态管理系统设计
Context
当前爬虫系统存在两个核心缺陷:
- 关键词消费不可恢复:
get_available()通过last_requested_date=today标记关键词已用,一旦爬虫崩溃,该关键词当天不会再被分配,已爬取的页面数据白白浪费。 - 分页状态无持久化:所有 3 个爬虫的分页逻辑都在内存中(Boss MAX_PAGES=3, QCWY MAX_PAGES=50, Zhilian MAX_PAGES=15),网络异常或进程重启后无法从断点恢复。
用户需求:关键词用完后标记不再重复请求 + 记录分页进度实现断点续爬。
架构决策
- 扩展现有 keyword 表(而非新建表):crawl 状态与 keyword 是 1:1 日粒度关系,新建表增加 JOIN 开销且无收益
- 状态机驱动:
crawl_status字段控制关键词生命周期,替代简单的 date 比较 - 服务端记录进度:爬虫每完成一页向服务端汇报,而非本地记录(支持多机分布式爬取)
- 过期检测:
crawling状态超时自动降级为partial,防止僵死
实施步骤
Step 1: 扩展 Keyword 模型
修改 app/models/keyword.py
在 BaseKeyword 中新增字段:
class BaseKeyword(Model):
id = fields.IntField(pk=True)
city = fields.CharField(max_length=64)
job = fields.CharField(max_length=128)
last_requested_date = fields.DateField(null=True)
last_requested_at = fields.DatetimeField(null=True)
# --- 新增:爬取状态管理 ---
crawl_status = fields.CharField(max_length=16, default="idle")
# 状态值: idle / crawling / completed / failed / partial
last_completed_page = fields.IntField(default=0) # 最后完成的页码
total_pages = fields.IntField(default=0) # 发现的总页数(0=未知)
jobs_found = fields.IntField(default=0) # 累计发现的职位数
crawl_started_at = fields.DatetimeField(null=True) # 当次爬取开始时间
crawler_id = fields.CharField(max_length=64, default="") # 爬虫实例标识
error_message = fields.TextField(default="") # 最后错误信息
retry_count = fields.IntField(default=0) # 当天重试次数
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
abstract = True
状态机流转:
idle ──(get_available)──► crawling ──(all pages done)──► completed
│
├──(spider reports error)──► failed ──(retry<3)──► crawling
│
└──(timeout 30min)──► partial ──(get_available)──► crawling
次日 00:00:所有状态 → idle(通过 last_requested_date != today 自动重置)
Step 2: 重写 get_available() 控制器
修改 app/controllers/keyword.py 的 get_available() 方法
优先级调度逻辑(替代当前的简单 date 过滤):
优先级 1: crawl_status='partial' AND last_requested_date=today (断点续爬)
优先级 2: crawl_status='failed' AND retry_count<3 AND last_requested_date=today (失败重试)
优先级 3: (last_requested_date!=today OR last_requested_date IS NULL) (全新关键词)
返回值增加 last_completed_page 和 crawl_status,使爬虫知道从哪页开始:
items = [{
"id": r.id,
"city": r.city,
"job": r.job,
"last_completed_page": r.last_completed_page, # 新增
"crawl_status": r.crawl_status, # 新增
}]
认领时原子更新:
update_fields = {
"last_requested_date": today,
"last_requested_at": now,
"crawl_status": "crawling",
"crawl_started_at": now,
"crawler_id": crawler_id, # 从请求参数获取
}
# 如果是全新关键词(非续爬),重置分页状态
if is_fresh:
update_fields["last_completed_page"] = 0
update_fields["total_pages"] = 0
update_fields["jobs_found"] = 0
update_fields["error_message"] = ""
update_fields["retry_count"] = 0
Step 3: 新增进度汇报 API
修改 app/api/v1/keyword/keyword.py — 新增 2 个端点
3.1 页面进度汇报
POST /api/v1/keyword/page-progress
Body: {
"source": "boss",
"keyword_id": 123,
"page": 2,
"total_pages": 10, // 可选,爬虫发现的总页数
"jobs_found": 15 // 本页发现的职位数
}
控制器逻辑:
async def report_page_progress(self, source, keyword_id, page, total_pages=None, jobs_found=0):
model = self._ensure_model(source)
update_data = {"last_completed_page": page}
if total_pages is not None and total_pages > 0:
update_data["total_pages"] = total_pages
# jobs_found 累加
await model.filter(id=keyword_id).update(
last_completed_page=page,
jobs_found=F("jobs_found") + jobs_found,
**({"total_pages": total_pages} if total_pages else {})
)
3.2 爬取完成/失败汇报
POST /api/v1/keyword/crawl-complete
Body: {
"source": "boss",
"keyword_id": 123,
"status": "completed" | "failed",
"error_message": "optional error detail"
}
控制器逻辑:
async def report_crawl_complete(self, source, keyword_id, status, error_message=""):
model = self._ensure_model(source)
update_data = {"crawl_status": status, "error_message": error_message}
if status == "failed":
# 使用 F 表达式原子递增 retry_count
await model.filter(id=keyword_id).update(
crawl_status="failed",
error_message=error_message,
)
# retry_count 单独递增
obj = await model.filter(id=keyword_id).first()
if obj:
obj.retry_count += 1
await obj.save(update_fields=["retry_count"])
else:
await model.filter(id=keyword_id).update(**update_data)
Step 4: 新增请求 Schema
修改 app/schemas/keyword.py(或新建)
class PageProgressRequest(BaseModel):
source: str
keyword_id: int
page: int
total_pages: Optional[int] = None
jobs_found: int = 0
class CrawlCompleteRequest(BaseModel):
source: str
keyword_id: int
status: Literal["completed", "failed"]
error_message: str = ""
Step 5: 过期爬取检测(定时任务)
修改 app/core/scheduler.py — 新增 stale_crawl_cleanup_job
# 每 10 分钟检查一次
async def stale_crawl_cleanup():
"""将超过 30 分钟仍为 crawling 状态的关键词降级为 partial"""
threshold = datetime.now() - timedelta(minutes=30)
for model in [BossKeyword, QcwyKeyword, ZhilianKeyword]:
count = await model.filter(
crawl_status="crawling",
crawl_started_at__lt=threshold,
).update(crawl_status="partial")
if count:
logger.info(f"{model.__name__}: {count} 条僵死爬取任务已标记为 partial")
Step 6: 修改爬虫 — Boss 直聘
修改 jobs_spider/boss/boos_api.py
6.1 增强 fetch_service_params()
def fetch_service_params() -> Optional[Dict[str, Any]]:
try:
url = f"{API_BASE_URL}/api/v1/keyword/available"
crawler_id = f"boss-{os.getpid()}-{os.getenv('HOSTNAME', 'local')}"
r = requests.get(url, params={
"source": "boss", "limit": 1, "reserve": True,
"crawler_id": crawler_id, # 新增
}, timeout=10)
# ... 解析逻辑 ...
item = items[0]
# 不再需要 mark-used(get_available 已原子标记)
return {
"query": item["job"],
"city": item["city"],
"scene": 1,
"page": item.get("last_completed_page", 0) + 1, # 断点续爬
"keyword_id": item["id"], # 新增
}
except Exception:
return None
6.2 主循环添加进度汇报
# 在 get_job_list_multi_pages 的每页完成后回调中:
def on_page_complete(page_num, jobs_count, keyword_id):
try:
requests.post(f"{API_BASE_URL}/api/v1/keyword/page-progress", json={
"source": "boss",
"keyword_id": keyword_id,
"page": page_num,
"jobs_found": jobs_count,
}, timeout=5)
except Exception:
pass # 汇报失败不影响主流程
# 全部完成后:
def on_crawl_done(keyword_id, success, error_msg=""):
try:
requests.post(f"{API_BASE_URL}/api/v1/keyword/crawl-complete", json={
"source": "boss",
"keyword_id": keyword_id,
"status": "completed" if success else "failed",
"error_message": error_msg,
}, timeout=5)
except Exception:
pass
Step 7: 修改爬虫 — 前程无忧
修改 jobs_spider/qcwy/qcwy.py
同 Boss 结构:
fetch_service_params()增加keyword_id和last_completed_page返回crawl_multiple_pages()的start_page从last_completed_page + 1开始- 每页完成后调用
page-progressAPI - 全部完成/失败后调用
crawl-completeAPI
Step 8: 修改爬虫 — 智联招聘
修改 jobs_spider/zhilian/zhilian_single.py
同 Boss 结构:
fetch_service_params()增加keyword_id和last_completed_page返回crawl_pc()的起始页从last_completed_page + 1开始- 每页完成后调用
page-progressAPI - 全部完成/失败后调用
crawl-completeAPI
Step 9: 统计接口增强
修改 app/controllers/keyword.py 的 get_stats()
返回值增加爬取状态分布:
async def get_stats(self, source, on_date=None):
# ... 现有逻辑 ...
# 新增状态分布
crawling = await model.filter(crawl_status="crawling", last_requested_date=d).count()
completed = await model.filter(crawl_status="completed", last_requested_date=d).count()
failed = await model.filter(crawl_status="failed", last_requested_date=d).count()
partial = await model.filter(crawl_status="partial", last_requested_date=d).count()
return {
"data": {
"date": str(d), "total": total, "used": used, "unused": unused,
"crawl_status": {
"crawling": crawling,
"completed": completed,
"failed": failed,
"partial": partial,
}
}
}
Step 10: 数据库迁移
执行 Aerich 迁移以在 MySQL keyword 表中添加新字段:
aerich migrate --name add_crawl_state_fields
aerich upgrade
或在 init_app.py 的自动迁移中由 Aerich 自动处理(RUN_MIGRATIONS_ON_STARTUP=True)。
关键文件清单
| 文件 | 操作 | 说明 |
|---|---|---|
app/models/keyword.py |
修改 | 添加 8 个爬取状态字段 |
app/controllers/keyword.py |
修改 | 重写 get_available() 优先级调度 + 新增 2 个方法 |
app/api/v1/keyword/keyword.py |
修改 | 新增 page-progress / crawl-complete 端点 |
app/schemas/keyword.py |
新建 | PageProgressRequest / CrawlCompleteRequest |
app/core/scheduler.py |
修改 | 新增 stale_crawl_cleanup 定时任务 |
jobs_spider/boss/boos_api.py |
修改 | 断点续爬 + 进度汇报 |
jobs_spider/qcwy/qcwy.py |
修改 | 断点续爬 + 进度汇报 |
jobs_spider/zhilian/zhilian_single.py |
修改 | 断点续爬 + 进度汇报 |
效果
| 指标 | 改造前 | 改造后 |
|---|---|---|
| 崩溃恢复 | 关键词丢失,当天不可恢复 | 自动从断点续爬 |
| 页面重复爬取 | 100%(整个关键词重爬) | 0%(精确到页级别) |
| 僵死任务检测 | 无 | 30 分钟自动降级 |
| 失败重试 | 无(关键词当天报废) | 最多 3 次自动重试 |
| 爬取进度可见性 | 无 | 实时可查(stats API) |
风险与缓解
| 风险 | 缓解措施 |
|---|---|
| 字段迁移影响现有数据 | 所有新字段都有 default 值,迁移无破坏性 |
| 进度汇报增加 API 压力 | 汇报请求轻量(仅 UPDATE 单行),每页仅 1 次,可设 timeout=5s |
| 爬虫不升级导致状态不一致 | 新旧爬虫可共存:旧爬虫不汇报进度,关键词仍按 date 逻辑工作 |
| retry_count 无上限 | 硬限 3 次,超过 3 次的 failed 不再自动重试 |
验证方式
- 启动应用,确认 Aerich 自动迁移新增字段成功
- 手动调用
GET /api/v1/keyword/available?source=boss验证返回last_completed_page和crawl_status - 模拟断点:手动设置某关键词
crawl_status=partial, last_completed_page=2,再次get_available应优先返回该关键词 - 启动 Boss 爬虫,观察日志确认从
last_completed_page + 1开始 - 强制 kill 爬虫,等待 30 分钟后确认
stale_crawl_cleanup将状态降级为partial - 重启爬虫,确认自动续爬