JobData/.claude/plan/project-optimization.md
2026-03-22 23:22:30 +08:00

12 KiB
Raw Permalink Blame History

📋 实施计划:项目功能修复与优化

生成时间2026-03-20 工作目录:/Users/win/2025/AICoding/JobData


一、问题全景(为什么很多功能用不了)

经过深度代码审查,共发现 22 个问题,其中多个问题会直接导致功能完全不可用。以下按功能模块分组。


🔴 数据清洗功能6 个问题 → 功能基本不可用)

# 严重度 文件 行号 问题 影响
C1 严重 services/cleaning.py 13 from jobs_spider.qcwy.search_company_jobs import _extract_items 引入私有函数,若该文件/函数不存在则整个模块 ImportError所有清洗 API 直接 500 全部清洗功能不可用
C2 严重 services/cleaning.py 多处 所有爬虫调用(boss_service.get_job_detail_by_id 等)是同步 HTTP 阻塞调用,在 async def 中直接执行,阻塞整个事件循环 高并发时应用无响应
C3 services/cleaning.py 28-36 Boss Token 加载后 _boss_token_loaded = True 永不刷新Token 过期后 Boss 清洗静默失败 Boss 平台清洗失效
C4 api/v1/cleaning/cleaning.py 285-308 process_task API 无超时保护,爬虫卡住则 HTTP 连接永久挂起 客户端超时
C5 前端缺失 后端菜单注册了 /cleaning/index/cleaning/monitor,但 web/src/views/不存在对应组件文件 点菜单白屏/404
C6 api/v1/cleaning/cleaning.py 71-75 source/status 直接拼入 ClickHouse SQLSQL 注入风险 安全漏洞

🔴 定时任务功能6 个问题 → 任务可能永久跳过)

# 严重度 文件 行号 问题 影响
S1 严重 core/locks.py 43 文件锁用 os.mkdir 实现,无 TTL 过期机制Worker 崩溃后锁目录永久残留,该任务永久跳过 任务永久失效
S2 严重 core/locks.py 38 异步函数中使用同步 redis.Redis阻塞事件循环 全局性能问题
S3 core/init_app.py 启动锁 .startup_lock 同样无 TTL崩溃后迁移和种子数据初始化永不再执行 启动异常
S4 core/locks.py 17 锁文件路径为相对路径 .lock_xxx,多 Worker 以不同 CWD 启动时锁完全失效 任务并发执行
S5 core/scheduler.py stats_jobecs_full_pipeline_job 调度时间完全重合(*/6h),同时执行压力大 资源竞争
S6 core/scheduler.py 181 company_cleaning_job 处理 30 个公司可能超过 5 分钟调度间隔,任务堆积被 skip 清洗停滞

🔴 安全问题4 个 → 凭据泄漏)

# 严重度 文件 行号 问题
X1 严重 ecs_full_pipeline.py 487-488 阿里云 AK/SK 硬编码在代码中,已在 git 历史里
X2 严重 settings/config.py 44-52 MySQL root 密码、SMTP 授权码、ClickHouse 密码硬编码
X3 严重 services/job.py 535 第三方 API 签名 salt 硬编码
X4 严重 core/dependency.py 26-28 token == "dev" 开发后门在生产环境同样有效

🟡 IP 告警功能3 个问题)

# 严重度 文件 行号 问题
I1 core/scheduler.py 273 邮件模板用 a.get('date') 但实际字段是 last_report_at,告警日期列永远为空
I2 core/ip_tracking.py 中间件读 response.body 对流式响应无效IP 计数不准
I3 core/ip_tracking.py 73 save() 未指定 update_fields,并发写存在竞态

🟡 分析功能2 个问题)

# 严重度 文件 行号 问题
A1 api/v1/analytics.py backports.zoneinfo 未在 Pipfile 中声明,若 Python 3.8 则 ImportError,整个分析路由挂
A2 api/v1/analytics.py Query(regex=...) 在 Pydantic v2 已弃用,应改 pattern

🟡 Ruff 报告的代码缺陷(之前已诊断,此处不重复)

共 34 个 lint 错误,其中 3 个 F821未定义变量 udt/fpt/json)会导致运行时崩溃。


二、实施步骤(按优先级排序)

Phase 1修复致命问题功能完全不可用

1.1 修复文件锁 — 添加 TTL 过期机制

文件app/core/locks.py

# 修改 _try_file_lock 方法
# 在 acquire 时写入时间戳到锁目录内的文件
# 在 acquire 失败时检查时间戳,若超过 TTL 则强制删除旧锁

async def acquire(self) -> bool:
    # Redis 路径不变
    if self._redis:
        return bool(self._redis.set(self._key, "locked", nx=True, ex=self.ttl))
    # 文件锁路径:改用绝对路径 + TTL 检查
    lock_dir = Path(tempfile.gettempdir()) / f"jobdata_lock_{self.name}"
    lock_meta = lock_dir / "meta"
    try:
        lock_dir.mkdir()
        lock_meta.write_text(str(time.time()))
        return True
    except FileExistsError:
        # 检查是否过期
        if lock_meta.exists():
            created = float(lock_meta.read_text())
            if time.time() - created > self.ttl:
                shutil.rmtree(lock_dir)  # 强制清理过期锁
                return await self.acquire()  # 重试
        return False

1.2 修复 Redis 同步阻塞 → 异步

文件app/core/locks.py

redis.Redis 替换为 redis.asyncio.Redis,所有 self._redis.set/get/delete 改为 await self._redis.set/get/delete

1.3 修复清洗模块 ImportError 风险

文件app/services/cleaning.py:13

# 修改前
from jobs_spider.qcwy.search_company_jobs import _extract_items as qcwy_extract_items

# 修改后:安全导入 + 降级
try:
    from jobs_spider.qcwy.search_company_jobs import _extract_items as qcwy_extract_items
except ImportError:
    logger.warning("qcwy search_company_jobs 模块不可用,公司职位提取功能降级")
    qcwy_extract_items = None

1.4 修复清洗中同步阻塞调用

文件app/services/cleaning.py 多处

将所有同步爬虫调用包装为 asyncio.to_thread

# 修改前
data = self.boss_service.get_job_detail_by_id(target)

# 修改后
data = await asyncio.to_thread(self.boss_service.get_job_detail_by_id, target)

涉及的方法:clean_by_job_idclean_by_company_nameclean_boss_company_jobsclean_qcwy_company_jobsclean_zhilian_company_jobs(共约 12 处调用)。

app/services/company_cleaner.py 中同样的模式也需要修复(同步爬虫调用包装为 to_thread)。

1.5 修复 Boss Token 永久缓存问题

文件app/services/cleaning.py:28-36app/services/company_cleaner.py:28-36

# 修改前
async def _ensure_boss_token_loaded(self) -> None:
    if self._boss_token_loaded and self.boss_service.login_data.get("mpt"):
        return  # 永不刷新

# 修改后:添加过期时间检查
async def _ensure_boss_token_loaded(self) -> None:
    now = time.time()
    if (self._boss_token_loaded
        and self.boss_service.login_data.get("mpt")
        and now - self._token_loaded_at < 3600):  # 1小时刷新一次
        return
    token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first()
    if token_obj:
        self.boss_service.login_data["mpt"] = token_obj.mpt_value
        self._boss_token_loaded = True
        self._token_loaded_at = now

Phase 2修复安全问题

2.1 凭据迁移config.py

创建 .env.example + 修改 config.pypydantic-settings 从环境变量读取(详见之前的 ruff-optimization 计划)。

2.2 移除 dev 后门

文件app/core/dependency.py:26-28

# 修改前
if token == "dev":
    user = await User.filter().first()
    return user

# 修改后:仅在开发环境允许
import os
if token == "dev" and os.getenv("APP_ENV", "production") == "development":
    user = await User.filter().first()
    return user

2.3 阿里云 AK/SK 移入环境变量

文件ecs_full_pipeline.py:487-488

# 修改前
ak = "LTAI5tBgW3hAzcnHBkZywxkD"
sk = "Il7M4bkJvdZIutkJH8pxhuMLrMvj5x"

# 修改后
ak = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]
sk = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]

Phase 3修复 IP 告警和分析功能

3.1 修复邮件模板字段名

文件app/core/scheduler.py:273

# 修改前
f"<td>{a.get('date')}</td>"

# 修改后
f"<td>{a.get('last_report_at', 'N/A')}</td>"

3.2 修复 analytics Query 参数弃用警告

文件app/api/v1/analytics.py

# 修改前
interval: str = Query("day", regex="^(day|hour|week|month)$")

# 修改后
interval: str = Query("day", pattern="^(day|hour|week|month)$")

3.3 修复 zoneinfo 导入

确认 Python 版本为 3.13(项目 Pipfile 声明),zoneinfo 是标准库,无需 backports。可直接删除 try/except只保留 from zoneinfo import ZoneInfo


Phase 4修复 Ruff 34 个 lint 错误

# 自动修复 22 个
pipenv run ruff check app/ --fix

# 手动修复剩余 12 个F821 × 3、E722 × 1、E402 × 5、其他 × 3

F821 重点修复:

  • job.py:348udt 未定义(需确认应为 update_date_time
  • job.py:374fpt 未定义(需确认应为 first_publish_time
  • crawler/zhilian.py:60 — 添加 import json

Phase 5代码去重和可维护性优化

  1. 合并 job.py 中 7 个重复的 _check_*_duplicate 为 1 个通用方法
  2. 删除死代码 _check_qcwy_company_duplicate_by_name
  3. job.pyrequests.post 替换为 httpx.AsyncClient
  4. 错调度时间:将 ecs_full_pipeline_job 偏移 30 分钟,避免与 stats_job 重合

三、关键文件索引

文件 操作 Phase 说明
app/core/locks.py 重构 1 文件锁 TTL + Redis 异步化
app/services/cleaning.py 修复 1 ImportError 防护 + async 阻塞 + Token 刷新
app/services/company_cleaner.py 修复 1 async 阻塞 + Token 刷新
app/core/dependency.py 修复 2 dev 后门加环境判断
app/settings/config.py 重构 2 凭据移入环境变量
ecs_full_pipeline.py 修复 2 AK/SK 移入环境变量
app/core/scheduler.py 修复 3 邮件字段名 + 调度时间偏移
app/api/v1/analytics.py 修复 3 regex→pattern + zoneinfo
app/services/job.py 修复 4+5 F821 + E722 + requests→httpx + 去重方法合并
app/services/crawler/zhilian.py 修复 4 添加 import json
web/src/views/cleaning/ 新建 5 创建前端清洗页面组件(可选)
.env.example 新建 2 环境变量模板

四、风险与缓解

风险 缓解措施
文件锁改造后旧锁目录残留 部署时手动清理 .lock_* 目录
Redis 异步化后连接池配置不同 保持相同连接参数,仅换客户端类
asyncio.to_thread 增加线程池压力 设置 max_workers=10 限制并发
凭据迁移后服务启动失败 先创建 .env 文件再部署
前端清洗页面组件工作量大 可先做最小 MVP列表 + 手动触发)

五、执行顺序

Phase 12h  → 修复致命问题:锁机制 + 清洗模块 + async 阻塞
Phase 21h  → 安全问题:凭据迁移 + dev 后门
Phase 330m → IP 告警 + 分析功能修复
Phase 430m → Ruff 34 个 lint 错误
Phase 52h  → 代码去重 + 前端组件(可选)

SESSION_ID供 /ccg:execute 使用)

  • CODEX_SESSION: N/A本次分析由 Claude 本地执行)
  • GEMINI_SESSION: N/A