JobData/.claude/plan/crawl-state-management.md
2026-03-22 23:22:30 +08:00

13 KiB
Raw Permalink Blame History

爬虫状态管理系统设计

Context

当前爬虫系统存在两个核心缺陷:

  1. 关键词消费不可恢复get_available() 通过 last_requested_date=today 标记关键词已用,一旦爬虫崩溃,该关键词当天不会再被分配,已爬取的页面数据白白浪费。
  2. 分页状态无持久化:所有 3 个爬虫的分页逻辑都在内存中Boss MAX_PAGES=3, QCWY MAX_PAGES=50, Zhilian MAX_PAGES=15网络异常或进程重启后无法从断点恢复。

用户需求:关键词用完后标记不再重复请求 + 记录分页进度实现断点续爬。


架构决策

  1. 扩展现有 keyword 表而非新建表crawl 状态与 keyword 是 1:1 日粒度关系,新建表增加 JOIN 开销且无收益
  2. 状态机驱动crawl_status 字段控制关键词生命周期,替代简单的 date 比较
  3. 服务端记录进度:爬虫每完成一页向服务端汇报,而非本地记录(支持多机分布式爬取)
  4. 过期检测crawling 状态超时自动降级为 partial,防止僵死

实施步骤

Step 1: 扩展 Keyword 模型

修改 app/models/keyword.py

BaseKeyword 中新增字段:

class BaseKeyword(Model):
    id = fields.IntField(pk=True)
    city = fields.CharField(max_length=64)
    job = fields.CharField(max_length=128)
    last_requested_date = fields.DateField(null=True)
    last_requested_at = fields.DatetimeField(null=True)

    # --- 新增:爬取状态管理 ---
    crawl_status = fields.CharField(max_length=16, default="idle")
    # 状态值: idle / crawling / completed / failed / partial
    last_completed_page = fields.IntField(default=0)      # 最后完成的页码
    total_pages = fields.IntField(default=0)               # 发现的总页数0=未知)
    jobs_found = fields.IntField(default=0)                # 累计发现的职位数
    crawl_started_at = fields.DatetimeField(null=True)     # 当次爬取开始时间
    crawler_id = fields.CharField(max_length=64, default="")  # 爬虫实例标识
    error_message = fields.TextField(default="")           # 最后错误信息
    retry_count = fields.IntField(default=0)               # 当天重试次数

    created_at = fields.DatetimeField(auto_now_add=True)
    updated_at = fields.DatetimeField(auto_now=True)

    class Meta:
        abstract = True

状态机流转:

idle ──(get_available)──► crawling ──(all pages done)──► completed
                              │
                              ├──(spider reports error)──► failed ──(retry<3)──► crawling
                              │
                              └──(timeout 30min)──► partial ──(get_available)──► crawling

次日 00:00所有状态 → idle通过 last_requested_date != today 自动重置)

Step 2: 重写 get_available() 控制器

修改 app/controllers/keyword.pyget_available() 方法

优先级调度逻辑(替代当前的简单 date 过滤):

优先级 1: crawl_status='partial' AND last_requested_date=today  (断点续爬)
优先级 2: crawl_status='failed' AND retry_count<3 AND last_requested_date=today  (失败重试)
优先级 3: (last_requested_date!=today OR last_requested_date IS NULL)  (全新关键词)

返回值增加 last_completed_pagecrawl_status,使爬虫知道从哪页开始:

items = [{
    "id": r.id,
    "city": r.city,
    "job": r.job,
    "last_completed_page": r.last_completed_page,  # 新增
    "crawl_status": r.crawl_status,                 # 新增
}]

认领时原子更新:

update_fields = {
    "last_requested_date": today,
    "last_requested_at": now,
    "crawl_status": "crawling",
    "crawl_started_at": now,
    "crawler_id": crawler_id,  # 从请求参数获取
}
# 如果是全新关键词(非续爬),重置分页状态
if is_fresh:
    update_fields["last_completed_page"] = 0
    update_fields["total_pages"] = 0
    update_fields["jobs_found"] = 0
    update_fields["error_message"] = ""
    update_fields["retry_count"] = 0

Step 3: 新增进度汇报 API

修改 app/api/v1/keyword/keyword.py — 新增 2 个端点

3.1 页面进度汇报

POST /api/v1/keyword/page-progress
Body: {
    "source": "boss",
    "keyword_id": 123,
    "page": 2,
    "total_pages": 10,     // 可选,爬虫发现的总页数
    "jobs_found": 15       // 本页发现的职位数
}

控制器逻辑:

async def report_page_progress(self, source, keyword_id, page, total_pages=None, jobs_found=0):
    model = self._ensure_model(source)
    update_data = {"last_completed_page": page}
    if total_pages is not None and total_pages > 0:
        update_data["total_pages"] = total_pages
    # jobs_found 累加
    await model.filter(id=keyword_id).update(
        last_completed_page=page,
        jobs_found=F("jobs_found") + jobs_found,
        **({"total_pages": total_pages} if total_pages else {})
    )

3.2 爬取完成/失败汇报

POST /api/v1/keyword/crawl-complete
Body: {
    "source": "boss",
    "keyword_id": 123,
    "status": "completed" | "failed",
    "error_message": "optional error detail"
}

控制器逻辑:

async def report_crawl_complete(self, source, keyword_id, status, error_message=""):
    model = self._ensure_model(source)
    update_data = {"crawl_status": status, "error_message": error_message}
    if status == "failed":
        # 使用 F 表达式原子递增 retry_count
        await model.filter(id=keyword_id).update(
            crawl_status="failed",
            error_message=error_message,
        )
        # retry_count 单独递增
        obj = await model.filter(id=keyword_id).first()
        if obj:
            obj.retry_count += 1
            await obj.save(update_fields=["retry_count"])
    else:
        await model.filter(id=keyword_id).update(**update_data)

Step 4: 新增请求 Schema

修改 app/schemas/keyword.py(或新建)

class PageProgressRequest(BaseModel):
    source: str
    keyword_id: int
    page: int
    total_pages: Optional[int] = None
    jobs_found: int = 0

class CrawlCompleteRequest(BaseModel):
    source: str
    keyword_id: int
    status: Literal["completed", "failed"]
    error_message: str = ""

Step 5: 过期爬取检测(定时任务)

修改 app/core/scheduler.py — 新增 stale_crawl_cleanup_job

# 每 10 分钟检查一次
async def stale_crawl_cleanup():
    """将超过 30 分钟仍为 crawling 状态的关键词降级为 partial"""
    threshold = datetime.now() - timedelta(minutes=30)
    for model in [BossKeyword, QcwyKeyword, ZhilianKeyword]:
        count = await model.filter(
            crawl_status="crawling",
            crawl_started_at__lt=threshold,
        ).update(crawl_status="partial")
        if count:
            logger.info(f"{model.__name__}: {count} 条僵死爬取任务已标记为 partial")

Step 6: 修改爬虫 — Boss 直聘

修改 jobs_spider/boss/boos_api.py

6.1 增强 fetch_service_params()

def fetch_service_params() -> Optional[Dict[str, Any]]:
    try:
        url = f"{API_BASE_URL}/api/v1/keyword/available"
        crawler_id = f"boss-{os.getpid()}-{os.getenv('HOSTNAME', 'local')}"
        r = requests.get(url, params={
            "source": "boss", "limit": 1, "reserve": True,
            "crawler_id": crawler_id,  # 新增
        }, timeout=10)
        # ... 解析逻辑 ...
        item = items[0]
        # 不再需要 mark-usedget_available 已原子标记)
        return {
            "query": item["job"],
            "city": item["city"],
            "scene": 1,
            "page": item.get("last_completed_page", 0) + 1,  # 断点续爬
            "keyword_id": item["id"],                          # 新增
        }
    except Exception:
        return None

6.2 主循环添加进度汇报

# 在 get_job_list_multi_pages 的每页完成后回调中:
def on_page_complete(page_num, jobs_count, keyword_id):
    try:
        requests.post(f"{API_BASE_URL}/api/v1/keyword/page-progress", json={
            "source": "boss",
            "keyword_id": keyword_id,
            "page": page_num,
            "jobs_found": jobs_count,
        }, timeout=5)
    except Exception:
        pass  # 汇报失败不影响主流程

# 全部完成后:
def on_crawl_done(keyword_id, success, error_msg=""):
    try:
        requests.post(f"{API_BASE_URL}/api/v1/keyword/crawl-complete", json={
            "source": "boss",
            "keyword_id": keyword_id,
            "status": "completed" if success else "failed",
            "error_message": error_msg,
        }, timeout=5)
    except Exception:
        pass

Step 7: 修改爬虫 — 前程无忧

修改 jobs_spider/qcwy/qcwy.py

同 Boss 结构:

  • fetch_service_params() 增加 keyword_idlast_completed_page 返回
  • crawl_multiple_pages()start_pagelast_completed_page + 1 开始
  • 每页完成后调用 page-progress API
  • 全部完成/失败后调用 crawl-complete API

Step 8: 修改爬虫 — 智联招聘

修改 jobs_spider/zhilian/zhilian_single.py

同 Boss 结构:

  • fetch_service_params() 增加 keyword_idlast_completed_page 返回
  • crawl_pc() 的起始页从 last_completed_page + 1 开始
  • 每页完成后调用 page-progress API
  • 全部完成/失败后调用 crawl-complete API

Step 9: 统计接口增强

修改 app/controllers/keyword.pyget_stats()

返回值增加爬取状态分布:

async def get_stats(self, source, on_date=None):
    # ... 现有逻辑 ...
    # 新增状态分布
    crawling = await model.filter(crawl_status="crawling", last_requested_date=d).count()
    completed = await model.filter(crawl_status="completed", last_requested_date=d).count()
    failed = await model.filter(crawl_status="failed", last_requested_date=d).count()
    partial = await model.filter(crawl_status="partial", last_requested_date=d).count()

    return {
        "data": {
            "date": str(d), "total": total, "used": used, "unused": unused,
            "crawl_status": {
                "crawling": crawling,
                "completed": completed,
                "failed": failed,
                "partial": partial,
            }
        }
    }

Step 10: 数据库迁移

执行 Aerich 迁移以在 MySQL keyword 表中添加新字段:

aerich migrate --name add_crawl_state_fields
aerich upgrade

或在 init_app.py 的自动迁移中由 Aerich 自动处理(RUN_MIGRATIONS_ON_STARTUP=True)。


关键文件清单

文件 操作 说明
app/models/keyword.py 修改 添加 8 个爬取状态字段
app/controllers/keyword.py 修改 重写 get_available() 优先级调度 + 新增 2 个方法
app/api/v1/keyword/keyword.py 修改 新增 page-progress / crawl-complete 端点
app/schemas/keyword.py 新建 PageProgressRequest / CrawlCompleteRequest
app/core/scheduler.py 修改 新增 stale_crawl_cleanup 定时任务
jobs_spider/boss/boos_api.py 修改 断点续爬 + 进度汇报
jobs_spider/qcwy/qcwy.py 修改 断点续爬 + 进度汇报
jobs_spider/zhilian/zhilian_single.py 修改 断点续爬 + 进度汇报

效果

指标 改造前 改造后
崩溃恢复 关键词丢失,当天不可恢复 自动从断点续爬
页面重复爬取 100%(整个关键词重爬) 0%(精确到页级别)
僵死任务检测 30 分钟自动降级
失败重试 无(关键词当天报废) 最多 3 次自动重试
爬取进度可见性 实时可查stats API

风险与缓解

风险 缓解措施
字段迁移影响现有数据 所有新字段都有 default 值,迁移无破坏性
进度汇报增加 API 压力 汇报请求轻量(仅 UPDATE 单行),每页仅 1 次,可设 timeout=5s
爬虫不升级导致状态不一致 新旧爬虫可共存:旧爬虫不汇报进度,关键词仍按 date 逻辑工作
retry_count 无上限 硬限 3 次,超过 3 次的 failed 不再自动重试

验证方式

  1. 启动应用,确认 Aerich 自动迁移新增字段成功
  2. 手动调用 GET /api/v1/keyword/available?source=boss 验证返回 last_completed_pagecrawl_status
  3. 模拟断点:手动设置某关键词 crawl_status=partial, last_completed_page=2,再次 get_available 应优先返回该关键词
  4. 启动 Boss 爬虫,观察日志确认从 last_completed_page + 1 开始
  5. 强制 kill 爬虫,等待 30 分钟后确认 stale_crawl_cleanup 将状态降级为 partial
  6. 重启爬虫,确认自动续爬