# Architecture **Analysis Date:** 2026-03-21 ## Pattern Overview **Overall:** Layered monolith with embedded pipeline architecture **Key Characteristics:** - FastAPI async backend with dual-database persistence (MySQL for RBAC/metadata, ClickHouse for job/company analytics data) - External crawlers (`jobs_spider/`, `spiderJobs/`) run as independent processes and push data to the backend API via HTTP POST — no shared code path at runtime - RBAC permission model enforced at router-level via FastAPI `Depends` - APScheduler drives all recurring automation (cleaning, ECS orchestration, alerting) within the main process - Registry pattern (`app/services/ingest/registry.py`) maps (platform, channel, data_type) tuples to ClickHouse table configs + dedup rules ## Layers **Router Layer:** - Purpose: Parse HTTP request, validate via Pydantic schema, delegate to service or controller, return JSON envelope - Location: `app/api/v1/` - Contains: One sub-package per domain (e.g., `job/`, `cleaning/`, `analytics/`, `keyword/`) - Depends on: Services, Controllers, `app/core/dependency.py` for auth - Used by: FastAPI application via `app/api/v1/__init__.py` → `app/__init__.py` **Service Layer:** - Purpose: Business logic, orchestration across repos/models, side-effect coordination - Location: `app/services/` - Contains: `IngestService`, `AnalyticsService`, `CleaningService`, `CompanyCleaner`, `CompanyJobsSyncService`, platform crawler service wrappers (`crawler/boss.py`, `crawler/qcwy.py`, `crawler/zhilian.py`) - Depends on: Repositories, ORM models, `clickhouse_manager`, external HTTP clients - Used by: Routers, APScheduler jobs **Controller Layer (RBAC domain operations):** - Purpose: Thin CRUD orchestration for MySQL-backed entities (User, Role, API, Menu, Dept, etc.) - Location: `app/controllers/` - Contains: `user_controller`, `api_controller`, `role_controller`, `menu_controller`, `dept_controller`, `keyword_controller`, `proxy_controller`, `token_controller`, `cleaning_controller`, `company_controller` - Depends on: Tortoise-ORM models - Used by: Routers **Repository Layer:** - Purpose: Encapsulate raw ClickHouse query execution behind typed methods - Location: `app/repositories/` - Contains: `ClickHouseBaseRepo` (generic query/insert), `JobAnalyticsRepo` (trend, distribution, count queries against `job_analytics` view) - Depends on: `clickhouse_connect.driver.AsyncClient` - Used by: `AnalyticsService` **Model Layer:** - Purpose: ORM schema definitions for MySQL, migrated via Aerich - Location: `app/models/` - Contains: `admin.py` (User, Role, Api, Menu, Dept, AuditLog), `token.py` (BossToken), `cleaning.py` (CleaningTask), `metrics.py` (ScheduledTaskRun, StatsTotal, IpUploadStats), `company.py` (CompanyCleaningQueue), `keyword.py` (BossKeyword, QcwyKeyword, ZhilianKeyword) - Depends on: `tortoise-orm` - Used by: Controllers, Services, Scheduler jobs **Core Infrastructure Layer:** - Purpose: App wiring, connection management, middleware, cross-cutting utilities - Location: `app/core/` - Contains: `init_app.py` (lifespan bootstrap), `scheduler.py` (APScheduler jobs), `clickhouse.py` (ClickHouseManager singleton), `clickhouse_init.py` (ClickHouse DDL), `dependency.py` (AuthControl, PermissionControl), `locks.py` (DistributedLock), `middlewares.py` (BackGroundTaskMiddleware, HttpAuditLogMiddleware, IpTrackingMiddleware), `exceptions.py`, `crud.py`, `ip_tracking.py` - Depends on: Everything below - Used by: `app/__init__.py`, all layers above **Ingest Sub-system:** - Purpose: Platform-agnostic data ingestion pipeline with registry-driven routing, dedup, and remote push - Location: `app/services/ingest/` - Contains: `service.py` (IngestService), `registry.py` (PlatformConfig registry), `dedup.py` (batch dedup filter), `remote_push.py` (async HTTP push to secondary target), `configs/` (per-platform PlatformConfig registrations) - Depends on: `clickhouse_connect`, registry configs - Used by: `app/api/v1/job/job.py` router **Spider Crawler Service Wrappers (in-process):** - Purpose: Wrap platform HTTP clients so backend scheduler can call crawlers directly (company cleaning, company search) - Location: `app/services/crawler/` - Contains: `_base.py` (BaseFetcher, BaseSearcher, ApiResult), `_http_client.py`, `_boss_client.py`, `_boss_api.py`, `_boss_sign.py`, `_zhilian_client.py`, `_zhilian_api.py`, `_zhilian_sign.py`, `_job51_client.py`, `_job51_api.py`, `_job51_sign.py`, `boss.py`, `qcwy.py`, `zhilian.py` - Depends on: `requests` (sync), platform sign/auth modules - Used by: `CompanyCleaner`, `CompanyController` **External Spider Processes:** - Purpose: Standalone crawlers running on ECS nodes; push data to backend via `POST /api/v1/universal/data/batch-store-async` - Location: `jobs_spider/boss/boos_api.py`, `jobs_spider/qcwy/qcwy.py`, `jobs_spider/zhilian/zhilian_single.py`, `spiderJobs/` - Depends on: `requests`, `loguru`, `PyExecJS`; no shared Python imports with `app/` - Used by: Scheduled via `ecs_full_pipeline.py` or manual launch ## Data Flow **Crawler → Storage Flow:** 1. External spider (`jobs_spider/boss/boos_api.py`) fetches keyword from `GET /api/v1/keyword/available` 2. Spider calls Boss/QCWY/Zhilian platform API with rotating proxy/token 3. Spider POSTs raw JSON batch to `POST /api/v1/universal/data/batch-store-async` 4. Router (`app/api/v1/job/job.py`) delegates to `IngestService.store_batch()` 5. `IngestService` looks up `PlatformConfig` from registry (`app/services/ingest/registry.py`) 6. Dedup filter queries ClickHouse for existing records (`app/services/ingest/dedup.py`) 7. New rows inserted to ClickHouse table (e.g., `job_data.boss_job`) via `clickhouse_connect` 8. Optional async push to secondary remote endpoint via `remote_push.py` **Analytics Query Flow:** 1. Frontend calls `GET /api/v1/analytics/volume-trend?interval=day&from_dt=...` 2. Router (`app/api/v1/analytics.py`) creates `AnalyticsService` with `clickhouse_manager` client 3. `AnalyticsService` delegates to `JobAnalyticsRepo.get_volume_trend()` 4. `JobAnalyticsRepo` queries the `job_data.job_analytics` VIEW (UNION ALL across three platform job tables) 5. Returns time-bucketed counts grouped by source **RBAC Authentication Flow:** 1. Client sends `POST /api/v1/base/access_token` with credentials 2. `AuthControl.is_authed()` verifies JWT (HS256, 7-day expiry) from `token` header 3. `PermissionControl.has_permission()` loads user's roles → role's API list 4. Compares `(method, path)` tuple against allowed API set 5. Superusers bypass permission check **Company Cleaning Flow (Scheduled):** 1. `company_cleaning_job()` in `app/core/scheduler.py` fires every 5 minutes 2. `CompanyCleaner.collect_pending_companies(limit=50)` queries ClickHouse job tables for company IDs not yet in `pending_company` table 3. `CompanyCleaner.process_pending_companies(limit=30)` fetches company details from platform crawlers (BossService, QcwyService, ZhilianService) 4. Cleaned company data stored via `company_storage` to ClickHouse `boss_company` / `qcwy_company` / `zhilian_company` 5. Run protected by `DistributedLock` (file-based fallback, optional Redis) **State Management:** - MySQL (via Tortoise-ORM): User sessions, RBAC entities, keyword crawl state, tokens, task run records - ClickHouse: All raw job/company JSON data + analytics views; no ORM — raw SQL via `clickhouse_connect` - Context variable `CTX_USER_ID` (Python `contextvars.ContextVar`) carries authenticated user ID within request scope (`app/core/ctx.py`) - File-based startup lock (`.startup_lock/` dir) prevents multi-worker seed data collisions ## Key Abstractions **PlatformConfig (Ingest Registry):** - Purpose: Maps `(platform, channel, data_type)` to target ClickHouse table and dedup field extractors - Examples: `app/services/ingest/registry.py`, `app/services/ingest/configs/` - Pattern: Immutable `@dataclass(frozen=True)`, registered at module import time via `register()` **DistributedLock:** - Purpose: Prevents concurrent execution of scheduled jobs across multiple Uvicorn workers - Examples: `app/core/locks.py` - Pattern: Redis SET NX with TTL; falls back to filesystem directory creation with TTL metadata **ClickHouseManager:** - Purpose: Singleton lazy-init connection wrapper around `clickhouse_connect.AsyncClient` - Examples: `app/core/clickhouse.py` - Pattern: Global module-level instance `clickhouse_manager`, injected into services at call time via `await clickhouse_manager.get_client()` **BaseFetcher / BaseSearcher:** - Purpose: Abstract base for platform-specific API clients within the in-process crawler wrappers - Examples: `app/services/crawler/_base.py` - Pattern: Template method — subclasses implement `_build_params()`, base handles HTTP invocation and `ApiResult` parsing **BaseModel (Tortoise):** - Purpose: Common ORM base with `BigIntField pk`, `to_dict()` async serializer with M2M support - Examples: `app/models/base.py` - Pattern: Mixed-in with `TimestampMixin` (created_at, updated_at auto fields) ## Entry Points **Application Entry:** - Location: `run.py` - Triggers: `python run.py` — reads `APP_HOST`, `APP_PORT`, `UVICORN_WORKERS` env vars; calls `uvicorn.run("app:app")` - Responsibilities: Configure uvicorn logging format, start ASGI server **FastAPI App Factory:** - Location: `app/__init__.py` → `create_app()` - Triggers: Uvicorn import of `app:app` - Responsibilities: Register middlewares, exception handlers, routers, static files; define lifespan **Lifespan Bootstrap:** - Location: `app/__init__.py` (lifespan context manager) + `app/core/init_app.py` (`init_data()`) - Triggers: FastAPI startup event - Responsibilities: Tortoise-ORM init + schema generation → Aerich migrations → seed data (superuser, menus, APIs, roles) → ClickHouse table init → APScheduler start **Router Aggregation:** - Location: `app/api/v1/__init__.py` - Triggers: Imported by `app/api/__init__.py` → `app/__init__.py` - Responsibilities: Mount all domain routers under `/api/v1/` with appropriate `DependPermission` guards **ECS Pipeline:** - Location: `ecs_full_pipeline.py` - Triggers: Manual execution or scheduled via `scheduler.py` (`ecs_full_pipeline_job`) every 6 hours - Responsibilities: Create Alibaba Cloud ECS spot instances, push spider scripts, execute crawlers, terminate instances ## Error Handling **Strategy:** Exception handler registration at FastAPI app level; structured error propagation up through layers **Patterns:** - Tortoise `DoesNotExist` → 404 via `DoesNotExistHandle` (registered in `app/core/init_app.py`) - Pydantic `RequestValidationError` → 422 with field-level detail via `RequestValidationHandle` - `IntegrityError` (DB constraint) → 400 via `IntegrityHandle` - Custom `HTTPException` (from `app/core/exceptions.py`) → structured JSON error via `HttpExcHandle` - Service layer exceptions logged with `loguru` then re-raised or returned as `{"success": False, "error": "..."}` - Scheduled jobs catch all exceptions, record failure via `_record_task_run()` to `ScheduledTaskRun` table, then continue ## Cross-Cutting Concerns **Logging:** `loguru` — imported from `app/log/__init__.py`; structured `logger.info/error/warning` calls throughout; external spider logs to rotating daily files in `jobs_spider/*/logs/` **Validation:** Pydantic v2 schemas in `app/schemas/` validate all inbound HTTP request bodies; ClickHouse inserts use registry-defined column extractors for type safety **Authentication:** JWT HS256 tokens issued by `/api/v1/base/access_token`; verified per-request by `AuthControl.is_authed()` injected via `DependAuth` / `DependPermission`; dev bypass (`token: dev`) available when `APP_ENV=development` **Audit Logging:** `HttpAuditLogMiddleware` records all non-excluded mutating requests (method, path, user, request args, response body, timing) to MySQL `auditlog` table --- *Architecture analysis: 2026-03-21*