- ARCH-04: job51 migrated to crawler_core (no old deps) - ARCH-05: zhilian migrated to crawler_core (no old deps) - 34 new mock tests (17 job51 + 17 zhilian) - Added _parse_zhilian_response custom parser for zhilian API format - Fixed POST Searcher _request() overrides for job51/zhilian - Full regression: 98 passed in 0.12s
12 KiB
Architecture
Analysis Date: 2026-03-21
Pattern Overview
Overall: Layered monolith with embedded pipeline architecture
Key Characteristics:
- FastAPI async backend with dual-database persistence (MySQL for RBAC/metadata, ClickHouse for job/company analytics data)
- External crawlers (
jobs_spider/,spiderJobs/) run as independent processes and push data to the backend API via HTTP POST — no shared code path at runtime - RBAC permission model enforced at router-level via FastAPI
Depends - APScheduler drives all recurring automation (cleaning, ECS orchestration, alerting) within the main process
- Registry pattern (
app/services/ingest/registry.py) maps (platform, channel, data_type) tuples to ClickHouse table configs + dedup rules
Layers
Router Layer:
- Purpose: Parse HTTP request, validate via Pydantic schema, delegate to service or controller, return JSON envelope
- Location:
app/api/v1/ - Contains: One sub-package per domain (e.g.,
job/,cleaning/,analytics/,keyword/) - Depends on: Services, Controllers,
app/core/dependency.pyfor auth - Used by: FastAPI application via
app/api/v1/__init__.py→app/__init__.py
Service Layer:
- Purpose: Business logic, orchestration across repos/models, side-effect coordination
- Location:
app/services/ - Contains:
IngestService,AnalyticsService,CleaningService,CompanyCleaner,CompanyJobsSyncService, platform crawler service wrappers (crawler/boss.py,crawler/qcwy.py,crawler/zhilian.py) - Depends on: Repositories, ORM models,
clickhouse_manager, external HTTP clients - Used by: Routers, APScheduler jobs
Controller Layer (RBAC domain operations):
- Purpose: Thin CRUD orchestration for MySQL-backed entities (User, Role, API, Menu, Dept, etc.)
- Location:
app/controllers/ - Contains:
user_controller,api_controller,role_controller,menu_controller,dept_controller,keyword_controller,proxy_controller,token_controller,cleaning_controller,company_controller - Depends on: Tortoise-ORM models
- Used by: Routers
Repository Layer:
- Purpose: Encapsulate raw ClickHouse query execution behind typed methods
- Location:
app/repositories/ - Contains:
ClickHouseBaseRepo(generic query/insert),JobAnalyticsRepo(trend, distribution, count queries againstjob_analyticsview) - Depends on:
clickhouse_connect.driver.AsyncClient - Used by:
AnalyticsService
Model Layer:
- Purpose: ORM schema definitions for MySQL, migrated via Aerich
- Location:
app/models/ - Contains:
admin.py(User, Role, Api, Menu, Dept, AuditLog),token.py(BossToken),cleaning.py(CleaningTask),metrics.py(ScheduledTaskRun, StatsTotal, IpUploadStats),company.py(CompanyCleaningQueue),keyword.py(BossKeyword, QcwyKeyword, ZhilianKeyword) - Depends on:
tortoise-orm - Used by: Controllers, Services, Scheduler jobs
Core Infrastructure Layer:
- Purpose: App wiring, connection management, middleware, cross-cutting utilities
- Location:
app/core/ - Contains:
init_app.py(lifespan bootstrap),scheduler.py(APScheduler jobs),clickhouse.py(ClickHouseManager singleton),clickhouse_init.py(ClickHouse DDL),dependency.py(AuthControl, PermissionControl),locks.py(DistributedLock),middlewares.py(BackGroundTaskMiddleware, HttpAuditLogMiddleware, IpTrackingMiddleware),exceptions.py,crud.py,ip_tracking.py - Depends on: Everything below
- Used by:
app/__init__.py, all layers above
Ingest Sub-system:
- Purpose: Platform-agnostic data ingestion pipeline with registry-driven routing, dedup, and remote push
- Location:
app/services/ingest/ - Contains:
service.py(IngestService),registry.py(PlatformConfig registry),dedup.py(batch dedup filter),remote_push.py(async HTTP push to secondary target),configs/(per-platform PlatformConfig registrations) - Depends on:
clickhouse_connect, registry configs - Used by:
app/api/v1/job/job.pyrouter
Spider Crawler Service Wrappers (in-process):
- Purpose: Wrap platform HTTP clients so backend scheduler can call crawlers directly (company cleaning, company search)
- Location:
app/services/crawler/ - Contains:
_base.py(BaseFetcher, BaseSearcher, ApiResult),_http_client.py,_boss_client.py,_boss_api.py,_boss_sign.py,_zhilian_client.py,_zhilian_api.py,_zhilian_sign.py,_job51_client.py,_job51_api.py,_job51_sign.py,boss.py,qcwy.py,zhilian.py - Depends on:
requests(sync), platform sign/auth modules - Used by:
CompanyCleaner,CompanyController
External Spider Processes:
- Purpose: Standalone crawlers running on ECS nodes; push data to backend via
POST /api/v1/universal/data/batch-store-async - Location:
jobs_spider/boss/boos_api.py,jobs_spider/qcwy/qcwy.py,jobs_spider/zhilian/zhilian_single.py,spiderJobs/ - Depends on:
requests,loguru,PyExecJS; no shared Python imports withapp/ - Used by: Scheduled via
ecs_full_pipeline.pyor manual launch
Data Flow
Crawler → Storage Flow:
- External spider (
jobs_spider/boss/boos_api.py) fetches keyword fromGET /api/v1/keyword/available - Spider calls Boss/QCWY/Zhilian platform API with rotating proxy/token
- Spider POSTs raw JSON batch to
POST /api/v1/universal/data/batch-store-async - Router (
app/api/v1/job/job.py) delegates toIngestService.store_batch() IngestServicelooks upPlatformConfigfrom registry (app/services/ingest/registry.py)- Dedup filter queries ClickHouse for existing records (
app/services/ingest/dedup.py) - New rows inserted to ClickHouse table (e.g.,
job_data.boss_job) viaclickhouse_connect - Optional async push to secondary remote endpoint via
remote_push.py
Analytics Query Flow:
- Frontend calls
GET /api/v1/analytics/volume-trend?interval=day&from_dt=... - Router (
app/api/v1/analytics.py) createsAnalyticsServicewithclickhouse_managerclient AnalyticsServicedelegates toJobAnalyticsRepo.get_volume_trend()JobAnalyticsRepoqueries thejob_data.job_analyticsVIEW (UNION ALL across three platform job tables)- Returns time-bucketed counts grouped by source
RBAC Authentication Flow:
- Client sends
POST /api/v1/base/access_tokenwith credentials AuthControl.is_authed()verifies JWT (HS256, 7-day expiry) fromtokenheaderPermissionControl.has_permission()loads user's roles → role's API list- Compares
(method, path)tuple against allowed API set - Superusers bypass permission check
Company Cleaning Flow (Scheduled):
company_cleaning_job()inapp/core/scheduler.pyfires every 5 minutesCompanyCleaner.collect_pending_companies(limit=50)queries ClickHouse job tables for company IDs not yet inpending_companytableCompanyCleaner.process_pending_companies(limit=30)fetches company details from platform crawlers (BossService, QcwyService, ZhilianService)- Cleaned company data stored via
company_storageto ClickHouseboss_company/qcwy_company/zhilian_company - Run protected by
DistributedLock(file-based fallback, optional Redis)
State Management:
- MySQL (via Tortoise-ORM): User sessions, RBAC entities, keyword crawl state, tokens, task run records
- ClickHouse: All raw job/company JSON data + analytics views; no ORM — raw SQL via
clickhouse_connect - Context variable
CTX_USER_ID(Pythoncontextvars.ContextVar) carries authenticated user ID within request scope (app/core/ctx.py) - File-based startup lock (
.startup_lock/dir) prevents multi-worker seed data collisions
Key Abstractions
PlatformConfig (Ingest Registry):
- Purpose: Maps
(platform, channel, data_type)to target ClickHouse table and dedup field extractors - Examples:
app/services/ingest/registry.py,app/services/ingest/configs/ - Pattern: Immutable
@dataclass(frozen=True), registered at module import time viaregister()
DistributedLock:
- Purpose: Prevents concurrent execution of scheduled jobs across multiple Uvicorn workers
- Examples:
app/core/locks.py - Pattern: Redis SET NX with TTL; falls back to filesystem directory creation with TTL metadata
ClickHouseManager:
- Purpose: Singleton lazy-init connection wrapper around
clickhouse_connect.AsyncClient - Examples:
app/core/clickhouse.py - Pattern: Global module-level instance
clickhouse_manager, injected into services at call time viaawait clickhouse_manager.get_client()
BaseFetcher / BaseSearcher:
- Purpose: Abstract base for platform-specific API clients within the in-process crawler wrappers
- Examples:
app/services/crawler/_base.py - Pattern: Template method — subclasses implement
_build_params(), base handles HTTP invocation andApiResultparsing
BaseModel (Tortoise):
- Purpose: Common ORM base with
BigIntField pk,to_dict()async serializer with M2M support - Examples:
app/models/base.py - Pattern: Mixed-in with
TimestampMixin(created_at, updated_at auto fields)
Entry Points
Application Entry:
- Location:
run.py - Triggers:
python run.py— readsAPP_HOST,APP_PORT,UVICORN_WORKERSenv vars; callsuvicorn.run("app:app") - Responsibilities: Configure uvicorn logging format, start ASGI server
FastAPI App Factory:
- Location:
app/__init__.py→create_app() - Triggers: Uvicorn import of
app:app - Responsibilities: Register middlewares, exception handlers, routers, static files; define lifespan
Lifespan Bootstrap:
- Location:
app/__init__.py(lifespan context manager) +app/core/init_app.py(init_data()) - Triggers: FastAPI startup event
- Responsibilities: Tortoise-ORM init + schema generation → Aerich migrations → seed data (superuser, menus, APIs, roles) → ClickHouse table init → APScheduler start
Router Aggregation:
- Location:
app/api/v1/__init__.py - Triggers: Imported by
app/api/__init__.py→app/__init__.py - Responsibilities: Mount all domain routers under
/api/v1/with appropriateDependPermissionguards
ECS Pipeline:
- Location:
ecs_full_pipeline.py - Triggers: Manual execution or scheduled via
scheduler.py(ecs_full_pipeline_job) every 6 hours - Responsibilities: Create Alibaba Cloud ECS spot instances, push spider scripts, execute crawlers, terminate instances
Error Handling
Strategy: Exception handler registration at FastAPI app level; structured error propagation up through layers
Patterns:
- Tortoise
DoesNotExist→ 404 viaDoesNotExistHandle(registered inapp/core/init_app.py) - Pydantic
RequestValidationError→ 422 with field-level detail viaRequestValidationHandle IntegrityError(DB constraint) → 400 viaIntegrityHandle- Custom
HTTPException(fromapp/core/exceptions.py) → structured JSON error viaHttpExcHandle - Service layer exceptions logged with
loguruthen re-raised or returned as{"success": False, "error": "..."} - Scheduled jobs catch all exceptions, record failure via
_record_task_run()toScheduledTaskRuntable, then continue
Cross-Cutting Concerns
Logging: loguru — imported from app/log/__init__.py; structured logger.info/error/warning calls throughout; external spider logs to rotating daily files in jobs_spider/*/logs/
Validation: Pydantic v2 schemas in app/schemas/ validate all inbound HTTP request bodies; ClickHouse inserts use registry-defined column extractors for type safety
Authentication: JWT HS256 tokens issued by /api/v1/base/access_token; verified per-request by AuthControl.is_authed() injected via DependAuth / DependPermission; dev bypass (token: dev) available when APP_ENV=development
Audit Logging: HttpAuditLogMiddleware records all non-excluded mutating requests (method, path, user, request args, response body, timing) to MySQL auditlog table
Architecture analysis: 2026-03-21