318 lines
12 KiB
Markdown
318 lines
12 KiB
Markdown
# 📋 实施计划:项目功能修复与优化
|
||
|
||
> 生成时间:2026-03-20
|
||
> 工作目录:/Users/win/2025/AICoding/JobData
|
||
|
||
---
|
||
|
||
## 一、问题全景(为什么很多功能用不了)
|
||
|
||
经过深度代码审查,共发现 **22 个问题**,其中多个问题会直接导致功能完全不可用。以下按功能模块分组。
|
||
|
||
---
|
||
|
||
### 🔴 数据清洗功能(6 个问题 → 功能基本不可用)
|
||
|
||
| # | 严重度 | 文件 | 行号 | 问题 | 影响 |
|
||
|---|--------|------|------|------|------|
|
||
| C1 | **严重** | `services/cleaning.py` | 13 | `from jobs_spider.qcwy.search_company_jobs import _extract_items` 引入私有函数,若该文件/函数不存在则整个模块 `ImportError`,**所有清洗 API 直接 500** | 全部清洗功能不可用 |
|
||
| C2 | **严重** | `services/cleaning.py` | 多处 | 所有爬虫调用(`boss_service.get_job_detail_by_id` 等)是同步 HTTP 阻塞调用,在 `async def` 中直接执行,**阻塞整个事件循环** | 高并发时应用无响应 |
|
||
| C3 | **高** | `services/cleaning.py` | 28-36 | Boss Token 加载后 `_boss_token_loaded = True` 永不刷新,Token 过期后 Boss 清洗**静默失败** | Boss 平台清洗失效 |
|
||
| C4 | **高** | `api/v1/cleaning/cleaning.py` | 285-308 | `process_task` API 无超时保护,爬虫卡住则 HTTP 连接永久挂起 | 客户端超时 |
|
||
| C5 | **高** | **前端缺失** | — | 后端菜单注册了 `/cleaning/index` 和 `/cleaning/monitor`,但 `web/src/views/` 下**不存在对应组件文件** | 点菜单白屏/404 |
|
||
| C6 | **中** | `api/v1/cleaning/cleaning.py` | 71-75 | `source`/`status` 直接拼入 ClickHouse SQL,SQL 注入风险 | 安全漏洞 |
|
||
|
||
---
|
||
|
||
### 🔴 定时任务功能(6 个问题 → 任务可能永久跳过)
|
||
|
||
| # | 严重度 | 文件 | 行号 | 问题 | 影响 |
|
||
|---|--------|------|------|------|------|
|
||
| S1 | **严重** | `core/locks.py` | 43 | 文件锁用 `os.mkdir` 实现,**无 TTL 过期机制**,Worker 崩溃后锁目录永久残留,该任务**永久跳过** | 任务永久失效 |
|
||
| S2 | **严重** | `core/locks.py` | 38 | 异步函数中使用同步 `redis.Redis`,**阻塞事件循环** | 全局性能问题 |
|
||
| S3 | **高** | `core/init_app.py` | — | 启动锁 `.startup_lock` 同样无 TTL,崩溃后**迁移和种子数据初始化永不再执行** | 启动异常 |
|
||
| S4 | **高** | `core/locks.py` | 17 | 锁文件路径为**相对路径** `.lock_xxx`,多 Worker 以不同 CWD 启动时锁完全失效 | 任务并发执行 |
|
||
| S5 | **中** | `core/scheduler.py` | — | `stats_job` 与 `ecs_full_pipeline_job` 调度时间完全重合(`*/6h`),同时执行压力大 | 资源竞争 |
|
||
| S6 | **中** | `core/scheduler.py` | 181 | `company_cleaning_job` 处理 30 个公司可能超过 5 分钟调度间隔,任务堆积被 skip | 清洗停滞 |
|
||
|
||
---
|
||
|
||
### 🔴 安全问题(4 个 → 凭据泄漏)
|
||
|
||
| # | 严重度 | 文件 | 行号 | 问题 |
|
||
|---|--------|------|------|------|
|
||
| X1 | **严重** | `ecs_full_pipeline.py` | 487-488 | 阿里云 AK/SK 硬编码在代码中,已在 git 历史里 |
|
||
| X2 | **严重** | `settings/config.py` | 44-52 | MySQL root 密码、SMTP 授权码、ClickHouse 密码硬编码 |
|
||
| X3 | **严重** | `services/job.py` | 535 | 第三方 API 签名 salt 硬编码 |
|
||
| X4 | **严重** | `core/dependency.py` | 26-28 | `token == "dev"` 开发后门在生产环境同样有效 |
|
||
|
||
---
|
||
|
||
### 🟡 IP 告警功能(3 个问题)
|
||
|
||
| # | 严重度 | 文件 | 行号 | 问题 |
|
||
|---|--------|------|------|------|
|
||
| I1 | **中** | `core/scheduler.py` | 273 | 邮件模板用 `a.get('date')` 但实际字段是 `last_report_at`,告警日期列**永远为空** |
|
||
| I2 | **中** | `core/ip_tracking.py` | — | 中间件读 `response.body` 对流式响应无效,IP 计数不准 |
|
||
| I3 | **低** | `core/ip_tracking.py` | 73 | `save()` 未指定 `update_fields`,并发写存在竞态 |
|
||
|
||
---
|
||
|
||
### 🟡 分析功能(2 个问题)
|
||
|
||
| # | 严重度 | 文件 | 行号 | 问题 |
|
||
|---|--------|------|------|------|
|
||
| A1 | **高** | `api/v1/analytics.py` | — | `backports.zoneinfo` 未在 Pipfile 中声明,若 Python 3.8 则 `ImportError`,整个分析路由挂 |
|
||
| A2 | **低** | `api/v1/analytics.py` | — | `Query(regex=...)` 在 Pydantic v2 已弃用,应改 `pattern` |
|
||
|
||
---
|
||
|
||
### 🟡 Ruff 报告的代码缺陷(之前已诊断,此处不重复)
|
||
|
||
共 34 个 lint 错误,其中 3 个 F821(未定义变量 `udt`/`fpt`/`json`)会导致运行时崩溃。
|
||
|
||
---
|
||
|
||
## 二、实施步骤(按优先级排序)
|
||
|
||
### Phase 1:修复致命问题(功能完全不可用)
|
||
|
||
#### 1.1 修复文件锁 — 添加 TTL 过期机制
|
||
|
||
**文件**:`app/core/locks.py`
|
||
|
||
```python
|
||
# 修改 _try_file_lock 方法
|
||
# 在 acquire 时写入时间戳到锁目录内的文件
|
||
# 在 acquire 失败时检查时间戳,若超过 TTL 则强制删除旧锁
|
||
|
||
async def acquire(self) -> bool:
|
||
# Redis 路径不变
|
||
if self._redis:
|
||
return bool(self._redis.set(self._key, "locked", nx=True, ex=self.ttl))
|
||
# 文件锁路径:改用绝对路径 + TTL 检查
|
||
lock_dir = Path(tempfile.gettempdir()) / f"jobdata_lock_{self.name}"
|
||
lock_meta = lock_dir / "meta"
|
||
try:
|
||
lock_dir.mkdir()
|
||
lock_meta.write_text(str(time.time()))
|
||
return True
|
||
except FileExistsError:
|
||
# 检查是否过期
|
||
if lock_meta.exists():
|
||
created = float(lock_meta.read_text())
|
||
if time.time() - created > self.ttl:
|
||
shutil.rmtree(lock_dir) # 强制清理过期锁
|
||
return await self.acquire() # 重试
|
||
return False
|
||
```
|
||
|
||
#### 1.2 修复 Redis 同步阻塞 → 异步
|
||
|
||
**文件**:`app/core/locks.py`
|
||
|
||
将 `redis.Redis` 替换为 `redis.asyncio.Redis`,所有 `self._redis.set/get/delete` 改为 `await self._redis.set/get/delete`。
|
||
|
||
#### 1.3 修复清洗模块 ImportError 风险
|
||
|
||
**文件**:`app/services/cleaning.py:13`
|
||
|
||
```python
|
||
# 修改前
|
||
from jobs_spider.qcwy.search_company_jobs import _extract_items as qcwy_extract_items
|
||
|
||
# 修改后:安全导入 + 降级
|
||
try:
|
||
from jobs_spider.qcwy.search_company_jobs import _extract_items as qcwy_extract_items
|
||
except ImportError:
|
||
logger.warning("qcwy search_company_jobs 模块不可用,公司职位提取功能降级")
|
||
qcwy_extract_items = None
|
||
```
|
||
|
||
#### 1.4 修复清洗中同步阻塞调用
|
||
|
||
**文件**:`app/services/cleaning.py` 多处
|
||
|
||
将所有同步爬虫调用包装为 `asyncio.to_thread`:
|
||
|
||
```python
|
||
# 修改前
|
||
data = self.boss_service.get_job_detail_by_id(target)
|
||
|
||
# 修改后
|
||
data = await asyncio.to_thread(self.boss_service.get_job_detail_by_id, target)
|
||
```
|
||
|
||
涉及的方法:`clean_by_job_id`、`clean_by_company_name`、`clean_boss_company_jobs`、`clean_qcwy_company_jobs`、`clean_zhilian_company_jobs`(共约 12 处调用)。
|
||
|
||
`app/services/company_cleaner.py` 中同样的模式也需要修复(同步爬虫调用包装为 `to_thread`)。
|
||
|
||
#### 1.5 修复 Boss Token 永久缓存问题
|
||
|
||
**文件**:`app/services/cleaning.py:28-36`,`app/services/company_cleaner.py:28-36`
|
||
|
||
```python
|
||
# 修改前
|
||
async def _ensure_boss_token_loaded(self) -> None:
|
||
if self._boss_token_loaded and self.boss_service.login_data.get("mpt"):
|
||
return # 永不刷新
|
||
|
||
# 修改后:添加过期时间检查
|
||
async def _ensure_boss_token_loaded(self) -> None:
|
||
now = time.time()
|
||
if (self._boss_token_loaded
|
||
and self.boss_service.login_data.get("mpt")
|
||
and now - self._token_loaded_at < 3600): # 1小时刷新一次
|
||
return
|
||
token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first()
|
||
if token_obj:
|
||
self.boss_service.login_data["mpt"] = token_obj.mpt_value
|
||
self._boss_token_loaded = True
|
||
self._token_loaded_at = now
|
||
```
|
||
|
||
---
|
||
|
||
### Phase 2:修复安全问题
|
||
|
||
#### 2.1 凭据迁移(config.py)
|
||
|
||
创建 `.env.example` + 修改 `config.py` 用 `pydantic-settings` 从环境变量读取(详见之前的 ruff-optimization 计划)。
|
||
|
||
#### 2.2 移除 dev 后门
|
||
|
||
**文件**:`app/core/dependency.py:26-28`
|
||
|
||
```python
|
||
# 修改前
|
||
if token == "dev":
|
||
user = await User.filter().first()
|
||
return user
|
||
|
||
# 修改后:仅在开发环境允许
|
||
import os
|
||
if token == "dev" and os.getenv("APP_ENV", "production") == "development":
|
||
user = await User.filter().first()
|
||
return user
|
||
```
|
||
|
||
#### 2.3 阿里云 AK/SK 移入环境变量
|
||
|
||
**文件**:`ecs_full_pipeline.py:487-488`
|
||
|
||
```python
|
||
# 修改前
|
||
ak = "LTAI5tBgW3hAzcnHBkZywxkD"
|
||
sk = "Il7M4bkJvdZIutkJH8pxhuMLrMvj5x"
|
||
|
||
# 修改后
|
||
ak = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]
|
||
sk = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]
|
||
```
|
||
|
||
---
|
||
|
||
### Phase 3:修复 IP 告警和分析功能
|
||
|
||
#### 3.1 修复邮件模板字段名
|
||
|
||
**文件**:`app/core/scheduler.py:273`
|
||
|
||
```python
|
||
# 修改前
|
||
f"<td>{a.get('date')}</td>"
|
||
|
||
# 修改后
|
||
f"<td>{a.get('last_report_at', 'N/A')}</td>"
|
||
```
|
||
|
||
#### 3.2 修复 analytics Query 参数弃用警告
|
||
|
||
**文件**:`app/api/v1/analytics.py`
|
||
|
||
```python
|
||
# 修改前
|
||
interval: str = Query("day", regex="^(day|hour|week|month)$")
|
||
|
||
# 修改后
|
||
interval: str = Query("day", pattern="^(day|hour|week|month)$")
|
||
```
|
||
|
||
#### 3.3 修复 zoneinfo 导入
|
||
|
||
确认 Python 版本为 3.13(项目 Pipfile 声明),`zoneinfo` 是标准库,无需 `backports`。可直接删除 try/except,只保留 `from zoneinfo import ZoneInfo`。
|
||
|
||
---
|
||
|
||
### Phase 4:修复 Ruff 34 个 lint 错误
|
||
|
||
```bash
|
||
# 自动修复 22 个
|
||
pipenv run ruff check app/ --fix
|
||
|
||
# 手动修复剩余 12 个(F821 × 3、E722 × 1、E402 × 5、其他 × 3)
|
||
```
|
||
|
||
F821 重点修复:
|
||
- `job.py:348` — `udt` 未定义(需确认应为 `update_date_time`)
|
||
- `job.py:374` — `fpt` 未定义(需确认应为 `first_publish_time`)
|
||
- `crawler/zhilian.py:60` — 添加 `import json`
|
||
|
||
---
|
||
|
||
### Phase 5:代码去重和可维护性优化
|
||
|
||
1. 合并 `job.py` 中 7 个重复的 `_check_*_duplicate` 为 1 个通用方法
|
||
2. 删除死代码 `_check_qcwy_company_duplicate_by_name`
|
||
3. 将 `job.py` 中 `requests.post` 替换为 `httpx.AsyncClient`
|
||
4. 错调度时间:将 `ecs_full_pipeline_job` 偏移 30 分钟,避免与 `stats_job` 重合
|
||
|
||
---
|
||
|
||
## 三、关键文件索引
|
||
|
||
| 文件 | 操作 | Phase | 说明 |
|
||
|------|------|-------|------|
|
||
| `app/core/locks.py` | 重构 | 1 | 文件锁 TTL + Redis 异步化 |
|
||
| `app/services/cleaning.py` | 修复 | 1 | ImportError 防护 + async 阻塞 + Token 刷新 |
|
||
| `app/services/company_cleaner.py` | 修复 | 1 | async 阻塞 + Token 刷新 |
|
||
| `app/core/dependency.py` | 修复 | 2 | dev 后门加环境判断 |
|
||
| `app/settings/config.py` | 重构 | 2 | 凭据移入环境变量 |
|
||
| `ecs_full_pipeline.py` | 修复 | 2 | AK/SK 移入环境变量 |
|
||
| `app/core/scheduler.py` | 修复 | 3 | 邮件字段名 + 调度时间偏移 |
|
||
| `app/api/v1/analytics.py` | 修复 | 3 | regex→pattern + zoneinfo |
|
||
| `app/services/job.py` | 修复 | 4+5 | F821 + E722 + requests→httpx + 去重方法合并 |
|
||
| `app/services/crawler/zhilian.py` | 修复 | 4 | 添加 import json |
|
||
| `web/src/views/cleaning/` | 新建 | 5 | 创建前端清洗页面组件(可选) |
|
||
| `.env.example` | 新建 | 2 | 环境变量模板 |
|
||
|
||
---
|
||
|
||
## 四、风险与缓解
|
||
|
||
| 风险 | 缓解措施 |
|
||
|------|----------|
|
||
| 文件锁改造后旧锁目录残留 | 部署时手动清理 `.lock_*` 目录 |
|
||
| Redis 异步化后连接池配置不同 | 保持相同连接参数,仅换客户端类 |
|
||
| `asyncio.to_thread` 增加线程池压力 | 设置 `max_workers=10` 限制并发 |
|
||
| 凭据迁移后服务启动失败 | 先创建 `.env` 文件再部署 |
|
||
| 前端清洗页面组件工作量大 | 可先做最小 MVP(列表 + 手动触发) |
|
||
|
||
---
|
||
|
||
## 五、执行顺序
|
||
|
||
```
|
||
Phase 1(2h) → 修复致命问题:锁机制 + 清洗模块 + async 阻塞
|
||
Phase 2(1h) → 安全问题:凭据迁移 + dev 后门
|
||
Phase 3(30m) → IP 告警 + 分析功能修复
|
||
Phase 4(30m) → Ruff 34 个 lint 错误
|
||
Phase 5(2h) → 代码去重 + 前端组件(可选)
|
||
```
|
||
|
||
---
|
||
|
||
## SESSION_ID(供 /ccg:execute 使用)
|
||
- CODEX_SESSION: N/A(本次分析由 Claude 本地执行)
|
||
- GEMINI_SESSION: N/A
|