""" 用户权限系统 - API路由 """ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from pydantic import BaseModel from datetime import datetime from typing import Optional from app.database import get_db from app.user_models import User from app import auth_service router = APIRouter(prefix="/auth", tags=["用户认证"]) security = HTTPBearer() # 请求模型 class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): success: bool token: Optional[str] = None user: Optional[dict] = None message: Optional[str] = None class UserInfo(BaseModel): id: int username: str role: str email: Optional[str] # 登录接口 @router.post("/login", response_model=LoginResponse) def login(request: LoginRequest, db: Session = Depends(get_db)): """用户登录""" user = auth_service.authenticate_user(db, request.username, request.password) if not user: return LoginResponse( success=False, message="用户名或密码错误" ) # 创建会话 token = auth_service.create_session(db, user.id) auth_service.update_last_login(db, user) return LoginResponse( success=True, token=token, user={ "id": user.id, "username": user.username, "role": user.role, "email": user.email }, message="登录成功" ) # 登出接口 @router.post("/logout") def logout(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): """用户登出""" auth_service.invalidate_session(db, credentials.credentials) return {"success": True, "message": "已登出"} # 验证令牌接口 @router.get("/verify") def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): """验证用户令牌""" user = auth_service.validate_session(db, credentials.credentials) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的会话令牌" ) return { "success": True, "user": { "id": user.id, "username": user.username, "role": user.role, "email": user.email, "last_login": user.last_login.isoformat() if user.last_login else None } } # 获取当前用户信息 @router.get("/me") def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): """获取当前用户信息""" user = auth_service.validate_session(db, credentials.credentials) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的会话令牌" ) return { "id": user.id, "username": user.username, "role": user.role, "email": user.email, "is_active": user.is_active, "created_at": user.created_at.isoformat(), "last_login": user.last_login.isoformat() if user.last_login else None } # 依赖注入:验证用户已登录 def get_current_user_dependency(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): """依赖注入:获取当前用户""" user = auth_service.validate_session(db, credentials.credentials) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="请先登录" ) return user # 依赖注入:仅管理员可访问 def require_admin(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): """依赖注入:验证管理员权限""" user = auth_service.validate_session(db, credentials.credentials) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="请先登录" ) if user.role != 'admin': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限" ) return user