68 lines
2.6 KiB
Python
68 lines
2.6 KiB
Python
import os
|
|
from typing import Optional, Dict, Any
|
|
import jwt
|
|
from fastapi import Depends, Header, HTTPException, Request
|
|
from loguru import logger
|
|
|
|
from app.core.ctx import CTX_USER_ID
|
|
from app.models import Role, User
|
|
from app.settings import settings
|
|
|
|
|
|
def get_list_params(skip: int = 0, limit: int = 10, filters: Dict[str, Any] = None, sort_by: str = None, sort_order: str = "desc"):
|
|
"""获取列表查询参数"""
|
|
from app.core.crud import ListParams
|
|
return ListParams(
|
|
skip=skip,
|
|
limit=limit,
|
|
filters=filters or {},
|
|
sort_by=sort_by,
|
|
sort_order=sort_order
|
|
)
|
|
|
|
|
|
class AuthControl:
|
|
@classmethod
|
|
async def is_authed(cls, token: str = Header(..., description="token验证")) -> Optional["User"]:
|
|
try:
|
|
if token == "dev" and os.getenv("APP_ENV", "production") == "development":
|
|
user = await User.filter().first()
|
|
user_id = user.id
|
|
else:
|
|
decode_data = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.JWT_ALGORITHM)
|
|
user_id = decode_data.get("user_id")
|
|
user = await User.filter(id=user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="Authentication failed")
|
|
CTX_USER_ID.set(int(user_id))
|
|
return user
|
|
except jwt.DecodeError:
|
|
raise HTTPException(status_code=401, detail="无效的Token")
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=401, detail="登录已过期")
|
|
except Exception as e:
|
|
logger.error(f"Auth error: {repr(e)}")
|
|
raise HTTPException(status_code=500, detail="Internal authentication error")
|
|
|
|
|
|
class PermissionControl:
|
|
@classmethod
|
|
async def has_permission(cls, request: Request, current_user: User = Depends(AuthControl.is_authed)) -> None:
|
|
if current_user.is_superuser:
|
|
return
|
|
method = request.method
|
|
path = request.url.path
|
|
roles: list[Role] = await current_user.roles
|
|
if not roles:
|
|
raise HTTPException(status_code=403, detail="The user is not bound to a role")
|
|
apis = [await role.apis for role in roles]
|
|
permission_apis = list(set((api.method, api.path) for api in sum(apis, [])))
|
|
# path = "/api/v1/auth/userinfo"
|
|
# method = "GET"
|
|
if (method, path) not in permission_apis:
|
|
raise HTTPException(status_code=403, detail=f"Permission denied method:{method} path:{path}")
|
|
|
|
|
|
DependAuth = Depends(AuthControl.is_authed)
|
|
DependPermission = Depends(PermissionControl.has_permission)
|