JobData/app/core/scheduler.py
2026-03-22 23:22:30 +08:00

395 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import uuid
from datetime import datetime, timedelta
import subprocess
import sys
from pathlib import Path
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from tortoise.exceptions import OperationalError
from app.core.clickhouse import clickhouse_manager
from app.core.locks import DistributedLock
from app.log import logger
from app.settings.config import settings
from app.models.metrics import ScheduledTaskRun, StatsTotal
scheduler: AsyncIOScheduler | None = None
async def _record_task_run(task_id: str, task_name: str, status: str, started_at: datetime, error: str | None = None):
"""记录任务运行状态"""
finished_at = datetime.now()
duration_ms = int((finished_at.timestamp() - started_at.timestamp()) * 1000)
await ScheduledTaskRun.create(
task_id=task_id,
task_name=task_name,
status=status,
started_at=started_at,
finished_at=finished_at,
duration_ms=duration_ms,
error=error or "",
)
async def stats_job():
"""每6小时执行一次统计 ClickHouse 各表总量并上报"""
task_id = str(uuid.uuid4())
started_at = datetime.now()
task_name = "stats_job"
lock = DistributedLock(name=task_name, ttl_seconds=600)
async with lock.context() as acquired:
if not acquired:
logger.info("stats_job skipped: lock not acquired")
return
try:
client = await clickhouse_manager.get_client()
tables = [
("boss", "job", "boss_job"),
("qcwy", "job", "qcwy_job"),
("zhilian", "job", "zhilian_job"),
]
results: list[dict] = []
for source, data_type, table in tables:
total_sql = f"SELECT COUNT() AS cnt FROM job_data.{table}"
total_rows = await client.query(total_sql)
total_count = int(total_rows.result_rows[0][0]) if total_rows.result_rows else 0
await StatsTotal.create(source=source, table_type=data_type, count=total_count, ts=datetime.now())
daily_sql = (
f"SELECT COUNT() AS cnt FROM job_data.{table} "
f"WHERE created_at >= toStartOfDay(now()) AND created_at < toStartOfDay(now()) + INTERVAL 1 DAY"
)
daily_rows = await client.query(daily_sql)
daily_count = int(daily_rows.result_rows[0][0]) if daily_rows.result_rows else 0
results.append({
"source": source,
"type": data_type,
"table": table,
"total": total_count,
"daily_new": daily_count,
})
payload = {
"task_id": task_id,
"ts": datetime.now().isoformat(),
"totals": results,
}
await _post_with_retry(json.dumps(payload))
await _send_email("6小时数据统计", payload)
await _record_task_run(task_id, task_name, "success", started_at)
except Exception as e:
logger.error(f"stats_job failed: {e}")
await _record_task_run(task_id, task_name, "fail", started_at, error=str(e))
async def ip_alert_job():
"""每10分钟执行检测最近窗口未上报的IP并告警"""
from app.models.metrics import IpUploadStats # 延迟导入避免循环
task_id = str(uuid.uuid4())
started_at = datetime.now()
task_name = "ip_alert_job"
lock = DistributedLock(name=task_name, ttl_seconds=300)
async with lock.context() as acquired:
if not acquired:
logger.info("ip_alert_job skipped: lock not acquired")
return
try:
window_minutes = getattr(settings, "ALERT_WINDOW_MINUTES", 10)
# 使用timezone-aware datetime避免与数据库中的datetime比较时出错
from datetime import timezone
# 创建aware datetimeUTC时区
now = datetime.now(timezone.utc)
threshold = now - timedelta(minutes=window_minutes)
cutoff = now.date()
items = await IpUploadStats.filter(date=cutoff).all()
anomalies: list[dict] = []
for item in items:
last_at = getattr(item, "last_report_at", None)
# 如果last_at是naive datetime转换为aware datetime进行比较
if last_at is not None:
# 检查是否是naive datetime没有tzinfo
if last_at.tzinfo is None:
# 假设数据库存储的是UTC时间转换为aware datetime
last_at = last_at.replace(tzinfo=timezone.utc)
if last_at is None or last_at < threshold:
if getattr(item, "status", "normal") != "abnormal":
item.status = "abnormal"
await item.save(update_fields=["status"])
anomalies.append({
"source": item.source,
"ip": item.ip,
"last_report_at": last_at.isoformat() if last_at else None,
"window_minutes": window_minutes,
})
if anomalies:
payload = {"task_id": task_id, "ts": datetime.now().isoformat(), "anomalies": anomalies}
await _post_with_retry(json.dumps(payload))
await _send_email("IP上报异常告警", payload)
duration = (datetime.now() - started_at).total_seconds()
logger.info(f"ip_alert_job completed in {duration:.2f} seconds")
await _record_task_run(task_id, task_name, "success", started_at)
except Exception as e:
logger.error(f"ip_alert_job failed: {e}")
await _record_task_run(task_id, task_name, "fail", started_at, error=str(e))
async def ecs_full_pipeline_job():
"""每6小时执行一次运行 ecs_full_pipeline.py 完整流程并记录结果"""
task_id = str(uuid.uuid4())
started_at = datetime.now()
task_name = "ecs_full_pipeline"
lock = DistributedLock(name=task_name, ttl_seconds=1800)
async with lock.context() as acquired:
if not acquired:
logger.info("ecs_full_pipeline skipped: lock not acquired")
return
try:
root = Path(__file__).resolve().parents[2]
script = root / "ecs_full_pipeline.py"
log = root / "ecs_full_pipeline.log"
with open(log, "a", encoding="utf-8") as f:
f.write(f"\n[定时] 开始执行 pipeline{started_at.isoformat()}\n")
proc = await asyncio.to_thread(
subprocess.run,
[sys.executable, "-u", str(script)],
stdout=f,
stderr=f,
text=True,
)
status = "success" if proc.returncode == 0 else "fail"
await _record_task_run(task_id, task_name, status, started_at, None if status == "success" else f"rc={proc.returncode}")
except Exception as e:
logger.error(f"ecs_full_pipeline failed: {e}")
await _record_task_run(task_id, task_name, "fail", started_at, error=str(e))
async def _get_active_proxy() -> "str | None":
"""从数据库读取可用代理配置,优先 platform='all'"""
from app.models.cleaning import ProxyConfig
proxy_obj = await ProxyConfig.filter(is_active=True).order_by("platform").first()
if proxy_obj:
logger.info(f"company_cleaning_job using proxy: {proxy_obj.name} ({proxy_obj.proxy_url[:30]}...)")
return proxy_obj.proxy_url
return None
async def company_cleaning_job():
"""每5分钟执行自动清洗待处理公司数据"""
from app.services.company_cleaner import company_cleaner
task_id = str(uuid.uuid4())
started_at = datetime.now()
task_name = "company_cleaning_job"
# Use a shorter lock TTL since it runs frequently
lock = DistributedLock(name=task_name, ttl_seconds=300)
async with lock.context() as acquired:
if not acquired:
logger.info("company_cleaning_job skipped: lock not acquired")
return
try:
logger.info("Running automated company cleaning job...")
# 1. Collect new data (with 7-day rule)
# 减少数量确保在5分钟内完成
await company_cleaner.collect_pending_companies(limit=50)
# 2. 从数据库读取代理配置
proxy = await _get_active_proxy()
# 3. Process pending data with small delay to be polite
# 减少数量确保在5分钟内完成30个公司每个约3-5秒加上延迟总计约2-3分钟
# 这样留出时间给收集任务和其他操作
await company_cleaner.process_pending_companies(limit=30, max_delay_seconds=1, proxy=proxy)
duration = (datetime.now() - started_at).total_seconds()
logger.info(f"company_cleaning_job completed in {duration:.2f} seconds")
await _record_task_run(task_id, task_name, "success", started_at)
except Exception as e:
logger.error(f"company_cleaning_job failed: {e}")
await _record_task_run(task_id, task_name, "fail", started_at, error=str(e))
async def daily_cleanup_job():
"""每天 00:05 执行:清理已完成的任务记录"""
from app.services.company_cleaner import company_cleaner
task_id = str(uuid.uuid4())
started_at = datetime.now()
task_name = "daily_cleanup_job"
lock = DistributedLock(name=task_name, ttl_seconds=3600)
async with lock.context() as acquired:
if not acquired:
return
try:
logger.info("Running daily cleanup job...")
await company_cleaner.cleanup_old_records()
await _record_task_run(task_id, task_name, "success", started_at)
except Exception as e:
logger.error(f"daily_cleanup_job failed: {e}")
await _record_task_run(task_id, task_name, "fail", started_at, error=str(e))
async def stale_crawl_cleanup_job():
"""每10分钟执行将超过30分钟仍为crawling状态的关键词降级为partial"""
from app.models.keyword import BossKeyword, QcwyKeyword, ZhilianKeyword
task_name = "stale_crawl_cleanup"
lock = DistributedLock(name=task_name, ttl_seconds=300)
async with lock.context() as acquired:
if not acquired:
return
threshold = datetime.now() - timedelta(minutes=30)
try:
for model in [BossKeyword, QcwyKeyword, ZhilianKeyword]:
count = await model.filter(
crawl_status="crawling",
crawl_started_at__lt=threshold,
).update(crawl_status="partial")
if count:
logger.info(f"{model.__name__}: {count} stale crawl tasks marked as partial")
except OperationalError as e:
error_text = str(e)
if "crawl_status" in error_text or "crawl_started_at" in error_text:
logger.warning(f"stale_crawl_cleanup skipped due to missing keyword crawl columns: {error_text}")
return
raise
async def _post_with_retry(body: str):
"""带失败重试的统计结果上报"""
import httpx
endpoint = getattr(settings, "REPORT_ENDPOINT", "")
if not endpoint:
logger.warning("REPORT_ENDPOINT not configured; skip reporting")
return
max_retries = getattr(settings, "REPORT_MAX_RETRIES", 3)
timeout = getattr(settings, "REPORT_TIMEOUT", 10)
async with httpx.AsyncClient(timeout=timeout) as client:
for attempt in range(1, max_retries + 1):
try:
resp = await client.post(endpoint, headers={"Content-Type": "application/json"}, content=body)
if 200 <= resp.status_code < 300:
return
raise RuntimeError(f"status={resp.status_code} body={resp.text}")
except Exception as e:
logger.warning(f"report attempt {attempt} failed: {e}")
await asyncio.sleep(min(5 * attempt, 15))
def _build_email_html(subject: str, payload: dict) -> str:
"""构建HTML邮件内容"""
ts = payload.get("ts") or datetime.now().isoformat()
style = (
"body{font-family:Arial,Helvetica,sans-serif;background:#f7f7f9;color:#333;}"
"h1{font-size:20px;margin:0 0 10px;}"
"p.meta{color:#666;font-size:12px;margin:0 0 16px;}"
"table{border-collapse:collapse;width:100%;background:#fff;border:1px solid #e5e7eb;}"
"th,td{border:1px solid #e5e7eb;padding:8px;text-align:left;font-size:13px;}"
"th{background:#f3f4f6;}"
".section{margin-top:18px;}"
".badge{display:inline-block;background:#2563eb;color:#fff;border-radius:12px;padding:2px 8px;font-size:12px;margin-left:8px;}"
)
html_head = f"<h1>{subject}<span class=\"badge\">{ts}</span></h1><p class=\"meta\">自动统计与通知</p>"
if "totals" in payload:
rows = "".join(
f"<tr><td>{r.get('source')}</td><td>{r.get('type')}</td><td>{r.get('table')}</td><td>{r.get('total')}</td><td>{r.get('daily_new')}</td></tr>"
for r in payload.get("totals", [])
)
table = f"<table><thead><tr><th>来源</th><th>类型</th><th>表名</th><th>总量</th><th>今日新增</th></tr></thead><tbody>{rows}</tbody></table>"
return f"<html><head><meta charset='utf-8'><style>{style}</style></head><body>{html_head}{table}</body></html>"
if "anomalies" in payload:
rows = "".join(
f"<tr><td>{a.get('source')}</td><td>{a.get('ip')}</td><td>{a.get('last_report_at', 'N/A')}</td></tr>" for a in payload.get("anomalies", [])
)
table = f"<table><thead><tr><th>来源</th><th>IP</th><th>日期</th></tr></thead><tbody>{rows}</tbody></table>"
return f"<html><head><meta charset='utf-8'><style>{style}</style></head><body>{html_head}{table}</body></html>"
body = json.dumps(payload, ensure_ascii=False, indent=2)
pre = f"<pre style='background:#111827;color:#e5e7eb;padding:12px;border-radius:6px;overflow:auto;font-size:12px;'>{body}</pre>"
return f"<html><head><meta charset='utf-8'><style>{style}</style></head><body>{html_head}{pre}</body></html>"
async def _send_email(subject: str, payload: dict):
"""发送HTML邮件通知"""
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
host = getattr(settings, "SMTP_HOST", "")
user = getattr(settings, "SMTP_USER", "")
password = getattr(settings, "SMTP_PASS", "")
sender = getattr(settings, "SMTP_FROM", user)
recipients = getattr(settings, "SMTP_TO", ["zfc9393@163.com"]) or ["zfc9393@163.com"]
if not host or not user or not password:
logger.warning("SMTP not configured; skip email sending")
return
html = _build_email_html(subject, payload)
msg = MIMEText(html, "html", "utf-8")
msg["Subject"] = subject
msg["From"] = formataddr(("JobData", sender))
msg["To"] = ", ".join(recipients)
try:
server = smtplib.SMTP(host, getattr(settings, "SMTP_PORT", 587))
server.starttls()
server.login(user, password)
server.sendmail(sender, recipients, msg.as_string())
server.quit()
except Exception as e:
logger.error(f"email send failed: {e}")
def start_scheduler():
"""启动全局调度器并注册任务"""
global scheduler
if scheduler is not None:
return
# 配置调度器允许任务延迟执行减少missed警告
# 注意max_instances设置为3允许任务排队实际并发控制通过分布式锁实现
scheduler = AsyncIOScheduler(
job_defaults={
'coalesce': True, # 合并多个待执行的任务
'max_instances': 3, # 允许最多3个实例排队实际并发由分布式锁控制
'misfire_grace_time': 600 # 允许600秒的延迟避免missed警告10分钟
}
)
# 每6小时触发
scheduler.add_job(stats_job, CronTrigger(second=0, minute=0, hour="*/6"), id="stats_job", replace_existing=True)
# 每6小时触发执行 ECS 全流程偏移30分钟避免与 stats_job 同时执行)
scheduler.add_job(ecs_full_pipeline_job, CronTrigger(second=0, minute=30, hour="*/6"), id="ecs_full_pipeline", replace_existing=True)
# 每10分钟触发告警
scheduler.add_job(ip_alert_job, CronTrigger(second=0, minute="*/10"), id="ip_alert_job", replace_existing=True)
# 每5分钟执行自动清洗
# 使用max_instances=3允许任务排队但分布式锁确保同一时间只有一个实例真正执行
scheduler.add_job(
company_cleaning_job,
CronTrigger(second=0, minute="*/5"),
id="company_cleaning_job",
replace_existing=True,
max_instances=3 # 允许最多3个实例排队实际执行由分布式锁控制
)
# 每天 00:05 执行:清理历史记录
scheduler.add_job(daily_cleanup_job, CronTrigger(second=0, minute=5, hour=0), id="daily_cleanup_job", replace_existing=True)
# 每10分钟执行检测僵死爬取任务并降级为partial
scheduler.add_job(stale_crawl_cleanup_job, CronTrigger(second=0, minute="*/10"), id="stale_crawl_cleanup", replace_existing=True)
scheduler.start()
def shutdown_scheduler():
"""关闭调度器"""
global scheduler
if scheduler is not None:
scheduler.shutdown(wait=False)
scheduler = None