You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

62 lines
1.7 KiB

"""
AmazingData 数据服务平台 - 认证依赖
"""
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from backend.models.database import get_db
from backend.models.tables import User
from backend.auth.jwt_handler import decode_access_token
from typing import Optional
# OAuth2 密码流
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login", auto_error=False)
async def get_current_user(
token: Optional[str] = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> Optional[User]:
"""获取当前用户(可选认证)"""
if not token:
return None
payload = decode_access_token(token)
if payload is None:
return None
username: str = payload.get("sub")
if username is None:
return None
user = db.query(User).filter(User.username == username).first()
if user is None or not user.is_active:
return None
return user
async def get_current_active_user(
current_user: Optional[User] = Depends(get_current_user)
) -> User:
"""获取当前活跃用户(需要认证)"""
if current_user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
return current_user
async def require_admin(
current_user: User = Depends(get_current_active_user)
) -> User:
"""要求管理员权限"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user