import os from typing import Optional, Dict, Any import jwt from fastapi import Depends, Header, HTTPException, Request from loguru import logger from app.core.ctx import CTX_USER_ID from app.models import Role, User from app.settings import settings def get_list_params(skip: int = 0, limit: int = 10, filters: Dict[str, Any] = None, sort_by: str = None, sort_order: str = "desc"): """获取列表查询参数""" from app.core.crud import ListParams return ListParams( skip=skip, limit=limit, filters=filters or {}, sort_by=sort_by, sort_order=sort_order ) class AuthControl: @classmethod async def is_authed(cls, token: str = Header(..., description="token验证")) -> Optional["User"]: try: if token == "dev" and os.getenv("APP_ENV", "production") == "development": user = await User.filter().first() user_id = user.id else: decode_data = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.JWT_ALGORITHM) user_id = decode_data.get("user_id") user = await User.filter(id=user_id).first() if not user: raise HTTPException(status_code=401, detail="Authentication failed") CTX_USER_ID.set(int(user_id)) return user except jwt.DecodeError: raise HTTPException(status_code=401, detail="无效的Token") except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="登录已过期") except Exception as e: logger.error(f"Auth error: {repr(e)}") raise HTTPException(status_code=500, detail="Internal authentication error") class PermissionControl: @classmethod async def has_permission(cls, request: Request, current_user: User = Depends(AuthControl.is_authed)) -> None: if current_user.is_superuser: return method = request.method path = request.url.path roles: list[Role] = await current_user.roles if not roles: raise HTTPException(status_code=403, detail="The user is not bound to a role") apis = [await role.apis for role in roles] permission_apis = list(set((api.method, api.path) for api in sum(apis, []))) # path = "/api/v1/auth/userinfo" # method = "GET" if (method, path) not in permission_apis: raise HTTPException(status_code=403, detail=f"Permission denied method:{method} path:{path}") DependAuth = Depends(AuthControl.is_authed) DependPermission = Depends(PermissionControl.has_permission)