import asyncio import json import uuid from datetime import datetime, timedelta import subprocess import sys from pathlib import Path from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from app.core.clickhouse import clickhouse_manager from app.core.locks import DistributedLock from app.log import logger from app.settings.config import settings from app.models.metrics import ScheduledTaskRun, StatsTotal scheduler: AsyncIOScheduler | None = None async def _record_task_run(task_id: str, task_name: str, status: str, started_at: datetime, error: str | None = None): """记录任务运行状态""" finished_at = datetime.now() duration_ms = int((finished_at.timestamp() - started_at.timestamp()) * 1000) await ScheduledTaskRun.create( task_id=task_id, task_name=task_name, status=status, started_at=started_at, finished_at=finished_at, duration_ms=duration_ms, error=error or "", ) async def stats_job(): """每6小时执行一次:统计 ClickHouse 各表总量并上报""" task_id = str(uuid.uuid4()) started_at = datetime.now() task_name = "stats_job" lock = DistributedLock(name=task_name, ttl_seconds=600) async with lock.context() as acquired: if not acquired: logger.info("stats_job skipped: lock not acquired") return try: client = await clickhouse_manager.get_client() tables = [ ("boss", "job", "boss_job"), ("qcwy", "job", "qcwy_job"), ("zhilian", "job", "zhilian_job"), ("boss", "company", "boss_company"), ("qcwy", "company", "qcwy_company"), ("zhilian", "company", "zhilian_company"), ] results: list[dict] = [] for source, data_type, table in tables: total_sql = f"SELECT COUNT() AS cnt FROM job_data.{table}" total_rows = await client.query(total_sql) total_count = int(total_rows.result_rows[0][0]) if total_rows.result_rows else 0 await StatsTotal.create(source=source, table_type=data_type, count=total_count, ts=datetime.now()) daily_sql = ( f"SELECT COUNT() AS cnt FROM job_data.{table} " f"WHERE created_at >= toStartOfDay(now()) AND created_at < toStartOfDay(now()) + INTERVAL 1 DAY" ) daily_rows = await client.query(daily_sql) daily_count = int(daily_rows.result_rows[0][0]) if daily_rows.result_rows else 0 results.append({ "source": source, "type": data_type, "table": table, "total": total_count, "daily_new": daily_count, }) payload = { "task_id": task_id, "ts": datetime.now().isoformat(), "totals": results, } await _post_with_retry(json.dumps(payload)) await _send_email("6小时数据统计", payload) await _record_task_run(task_id, task_name, "success", started_at) except Exception as e: logger.error(f"stats_job failed: {e}") await _record_task_run(task_id, task_name, "fail", started_at, error=str(e)) async def ip_alert_job(): """每10分钟执行:检测最近窗口未上报的IP并告警""" from app.models.metrics import IpUploadStats # 延迟导入避免循环 task_id = str(uuid.uuid4()) started_at = datetime.now() task_name = "ip_alert_job" lock = DistributedLock(name=task_name, ttl_seconds=300) async with lock.context() as acquired: if not acquired: logger.info("ip_alert_job skipped: lock not acquired") return try: window_minutes = getattr(settings, "ALERT_WINDOW_MINUTES", 10) # 使用timezone-aware datetime,避免与数据库中的datetime比较时出错 from datetime import timezone # 创建aware datetime(UTC时区) now = datetime.now(timezone.utc) threshold = now - timedelta(minutes=window_minutes) cutoff = now.date() items = await IpUploadStats.filter(date=cutoff).all() anomalies: list[dict] = [] for item in items: last_at = getattr(item, "last_report_at", None) # 如果last_at是naive datetime,转换为aware datetime进行比较 if last_at is not None: # 检查是否是naive datetime(没有tzinfo) if last_at.tzinfo is None: # 假设数据库存储的是UTC时间,转换为aware datetime last_at = last_at.replace(tzinfo=timezone.utc) if last_at is None or last_at < threshold: if getattr(item, "status", "normal") != "abnormal": item.status = "abnormal" await item.save(update_fields=["status"]) anomalies.append({ "source": item.source, "ip": item.ip, "last_report_at": last_at.isoformat() if last_at else None, "window_minutes": window_minutes, }) if anomalies: payload = {"task_id": task_id, "ts": datetime.now().isoformat(), "anomalies": anomalies} await _post_with_retry(json.dumps(payload)) await _send_email("IP上报异常告警", payload) duration = (datetime.now() - started_at).total_seconds() logger.info(f"ip_alert_job completed in {duration:.2f} seconds") await _record_task_run(task_id, task_name, "success", started_at) except Exception as e: logger.error(f"ip_alert_job failed: {e}") await _record_task_run(task_id, task_name, "fail", started_at, error=str(e)) async def ecs_full_pipeline_job(): """每6小时执行一次:运行 ecs_full_pipeline.py 完整流程并记录结果""" task_id = str(uuid.uuid4()) started_at = datetime.now() task_name = "ecs_full_pipeline" lock = DistributedLock(name=task_name, ttl_seconds=1800) async with lock.context() as acquired: if not acquired: logger.info("ecs_full_pipeline skipped: lock not acquired") return try: root = Path(__file__).resolve().parents[2] script = root / "ecs_full_pipeline.py" log = root / "ecs_full_pipeline.log" with open(log, "a", encoding="utf-8") as f: f.write(f"\n[定时] 开始执行 pipeline:{started_at.isoformat()}\n") proc = await asyncio.to_thread( subprocess.run, [sys.executable, "-u", str(script)], stdout=f, stderr=f, text=True, ) status = "success" if proc.returncode == 0 else "fail" await _record_task_run(task_id, task_name, status, started_at, None if status == "success" else f"rc={proc.returncode}") except Exception as e: logger.error(f"ecs_full_pipeline failed: {e}") await _record_task_run(task_id, task_name, "fail", started_at, error=str(e)) async def company_cleaning_job(): """每5分钟执行:自动清洗待处理公司数据""" from app.services.company_cleaner import company_cleaner task_id = str(uuid.uuid4()) started_at = datetime.now() task_name = "company_cleaning_job" # Use a shorter lock TTL since it runs frequently lock = DistributedLock(name=task_name, ttl_seconds=300) async with lock.context() as acquired: if not acquired: logger.info("company_cleaning_job skipped: lock not acquired") return try: logger.info("Running automated company cleaning job...") # 1. Collect new data (with 7-day rule) # 减少数量,确保在5分钟内完成 await company_cleaner.collect_pending_companies(limit=50) # 2. Process pending data with small delay to be polite # 减少数量,确保在5分钟内完成(30个公司,每个约3-5秒,加上延迟,总计约2-3分钟) # 这样留出时间给收集任务和其他操作 await company_cleaner.process_pending_companies(limit=30, max_delay_seconds=1) duration = (datetime.now() - started_at).total_seconds() logger.info(f"company_cleaning_job completed in {duration:.2f} seconds") await _record_task_run(task_id, task_name, "success", started_at) except Exception as e: logger.error(f"company_cleaning_job failed: {e}") await _record_task_run(task_id, task_name, "fail", started_at, error=str(e)) async def daily_cleanup_job(): """每天 00:05 执行:清理已完成的任务记录""" from app.services.company_cleaner import company_cleaner task_id = str(uuid.uuid4()) started_at = datetime.now() task_name = "daily_cleanup_job" lock = DistributedLock(name=task_name, ttl_seconds=3600) async with lock.context() as acquired: if not acquired: return try: logger.info("Running daily cleanup job...") await company_cleaner.cleanup_old_records() await _record_task_run(task_id, task_name, "success", started_at) except Exception as e: logger.error(f"daily_cleanup_job failed: {e}") await _record_task_run(task_id, task_name, "fail", started_at, error=str(e)) async def _post_with_retry(body: str): """带失败重试的统计结果上报""" import httpx endpoint = getattr(settings, "REPORT_ENDPOINT", "") if not endpoint: logger.warning("REPORT_ENDPOINT not configured; skip reporting") return max_retries = getattr(settings, "REPORT_MAX_RETRIES", 3) timeout = getattr(settings, "REPORT_TIMEOUT", 10) async with httpx.AsyncClient(timeout=timeout) as client: for attempt in range(1, max_retries + 1): try: resp = await client.post(endpoint, headers={"Content-Type": "application/json"}, content=body) if 200 <= resp.status_code < 300: return raise RuntimeError(f"status={resp.status_code} body={resp.text}") except Exception as e: logger.warning(f"report attempt {attempt} failed: {e}") await asyncio.sleep(min(5 * attempt, 15)) def _build_email_html(subject: str, payload: dict) -> str: """构建HTML邮件内容""" ts = payload.get("ts") or datetime.now().isoformat() style = ( "body{font-family:Arial,Helvetica,sans-serif;background:#f7f7f9;color:#333;}" "h1{font-size:20px;margin:0 0 10px;}" "p.meta{color:#666;font-size:12px;margin:0 0 16px;}" "table{border-collapse:collapse;width:100%;background:#fff;border:1px solid #e5e7eb;}" "th,td{border:1px solid #e5e7eb;padding:8px;text-align:left;font-size:13px;}" "th{background:#f3f4f6;}" ".section{margin-top:18px;}" ".badge{display:inline-block;background:#2563eb;color:#fff;border-radius:12px;padding:2px 8px;font-size:12px;margin-left:8px;}" ) html_head = f"

{subject}{ts}

自动统计与通知

" if "totals" in payload: rows = "".join( f"{r.get('source')}{r.get('type')}{r.get('table')}{r.get('total')}{r.get('daily_new')}" for r in payload.get("totals", []) ) table = f"{rows}
来源类型表名总量今日新增
" return f"{html_head}{table}" if "anomalies" in payload: rows = "".join( f"{a.get('source')}{a.get('ip')}{a.get('date')}" for a in payload.get("anomalies", []) ) table = f"{rows}
来源IP日期
" return f"{html_head}{table}" body = json.dumps(payload, ensure_ascii=False, indent=2) pre = f"
{body}
" return f"{html_head}{pre}" async def _send_email(subject: str, payload: dict): """发送HTML邮件通知""" import smtplib from email.mime.text import MIMEText from email.utils import formataddr host = getattr(settings, "SMTP_HOST", "") user = getattr(settings, "SMTP_USER", "") password = getattr(settings, "SMTP_PASS", "") sender = getattr(settings, "SMTP_FROM", user) recipients = getattr(settings, "SMTP_TO", ["zfc9393@163.com"]) or ["zfc9393@163.com"] if not host or not user or not password: logger.warning("SMTP not configured; skip email sending") return html = _build_email_html(subject, payload) msg = MIMEText(html, "html", "utf-8") msg["Subject"] = subject msg["From"] = formataddr(("JobData", sender)) msg["To"] = ", ".join(recipients) try: server = smtplib.SMTP(host, getattr(settings, "SMTP_PORT", 587)) server.starttls() server.login(user, password) server.sendmail(sender, recipients, msg.as_string()) server.quit() except Exception as e: logger.error(f"email send failed: {e}") def start_scheduler(): """启动全局调度器并注册任务""" global scheduler if scheduler is not None: return # 配置调度器,允许任务延迟执行,减少missed警告 # 注意:max_instances设置为3,允许任务排队,实际并发控制通过分布式锁实现 scheduler = AsyncIOScheduler( job_defaults={ 'coalesce': True, # 合并多个待执行的任务 'max_instances': 3, # 允许最多3个实例排队,实际并发由分布式锁控制 'misfire_grace_time': 600 # 允许600秒的延迟,避免missed警告(10分钟) } ) # 每6小时触发 scheduler.add_job(stats_job, CronTrigger(second=0, minute=0, hour="*/6"), id="stats_job", replace_existing=True) # 每6小时触发:执行 ECS 全流程 scheduler.add_job(ecs_full_pipeline_job, CronTrigger(second=0, minute=0, hour="*/6"), id="ecs_full_pipeline", replace_existing=True) # 每10分钟触发告警 scheduler.add_job(ip_alert_job, CronTrigger(second=0, minute="*/10"), id="ip_alert_job", replace_existing=True) # 每5分钟执行:自动清洗 # 使用max_instances=3允许任务排队,但分布式锁确保同一时间只有一个实例真正执行 scheduler.add_job( company_cleaning_job, CronTrigger(second=0, minute="*/5"), id="company_cleaning_job", replace_existing=True, max_instances=3 # 允许最多3个实例排队,实际执行由分布式锁控制 ) # 每天 00:05 执行:清理历史记录 scheduler.add_job(daily_cleanup_job, CronTrigger(second=0, minute=5, hour=0), id="daily_cleanup_job", replace_existing=True) scheduler.start() def shutdown_scheduler(): """关闭调度器""" global scheduler if scheduler is not None: scheduler.shutdown(wait=False) scheduler = None