You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
4.2 KiB

"""
用户权限系统 - API路由
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
from app.database import get_db
from app.user_models import User
from app import auth_service
router = APIRouter(prefix="/auth", tags=["用户认证"])
security = HTTPBearer()
# 请求模型
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
success: bool
token: Optional[str] = None
user: Optional[dict] = None
message: Optional[str] = None
class UserInfo(BaseModel):
id: int
username: str
role: str
email: Optional[str]
# 登录接口
@router.post("/login", response_model=LoginResponse)
def login(request: LoginRequest, db: Session = Depends(get_db)):
"""用户登录"""
user = auth_service.authenticate_user(db, request.username, request.password)
if not user:
return LoginResponse(
success=False,
message="用户名或密码错误"
)
# 创建会话
token = auth_service.create_session(db, user.id)
auth_service.update_last_login(db, user)
return LoginResponse(
success=True,
token=token,
user={
"id": user.id,
"username": user.username,
"role": user.role,
"email": user.email
},
message="登录成功"
)
# 登出接口
@router.post("/logout")
def logout(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
"""用户登出"""
auth_service.invalidate_session(db, credentials.credentials)
return {"success": True, "message": "已登出"}
# 验证令牌接口
@router.get("/verify")
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
"""验证用户令牌"""
user = auth_service.validate_session(db, credentials.credentials)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的会话令牌"
)
return {
"success": True,
"user": {
"id": user.id,
"username": user.username,
"role": user.role,
"email": user.email,
"last_login": user.last_login.isoformat() if user.last_login else None
}
}
# 获取当前用户信息
@router.get("/me")
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
"""获取当前用户信息"""
user = auth_service.validate_session(db, credentials.credentials)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的会话令牌"
)
return {
"id": user.id,
"username": user.username,
"role": user.role,
"email": user.email,
"is_active": user.is_active,
"created_at": user.created_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None
}
# 依赖注入:验证用户已登录
def get_current_user_dependency(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
"""依赖注入:获取当前用户"""
user = auth_service.validate_session(db, credentials.credentials)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="请先登录"
)
return user
# 依赖注入:仅管理员可访问
def require_admin(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
"""依赖注入:验证管理员权限"""
user = auth_service.validate_session(db, credentials.credentials)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="请先登录"
)
if user.role != 'admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
return user