""" 用户管理 API 路由 """ from typing import Annotated, List from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from app.schemas import ( UserCreate, UserResponse, UserUpdate, ResponseData, PageParams ) from app.services.auth_service import get_password_hash from app.api.v1.auth import get_current_user from app.models import User from app.db.init_db import get_sqlite_db router = APIRouter() @router.post("", response_model=ResponseData) async def create_user( request: UserCreate, db: Session = Depends(get_sqlite_db) ): """ 创建新用户 - **username**: 用户名 - **password**: 密码 - **email**: 邮箱 (可选) """ # 检查用户名是否已存在 existing = db.query(User).filter(User.username == request.username).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered" ) # 创建用户 user = User( username=request.username, password_hash=get_password_hash(request.password), email=request.email, role="user" ) db.add(user) db.commit() db.refresh(user) return ResponseData( code=0, message="success", data={ "id": user.id, "username": user.username, "email": user.email, "role": user.role, "created_at": user.created_at.isoformat() } ) @router.get("/me", response_model=ResponseData) async def get_current_user_info( current_user: Annotated[User, Depends(get_current_user)] ): """获取当前用户信息""" return ResponseData( code=0, message="success", data={ "id": current_user.id, "username": current_user.username, "email": current_user.email, "role": current_user.role, "is_active": current_user.is_active, "created_at": current_user.created_at.isoformat() } ) @router.put("/me", response_model=ResponseData) async def update_current_user( request: UserUpdate, current_user: Annotated[User, Depends(get_current_user)], db: Session = Depends(get_sqlite_db) ): """更新当前用户信息""" if request.email is not None: current_user.email = request.email if request.password is not None: current_user.password_hash = get_password_hash(request.password) from datetime import datetime current_user.updated_at = datetime.utcnow() db.commit() return ResponseData( code=0, message="success", data={ "id": current_user.id, "username": current_user.username, "email": current_user.email, "role": current_user.role } ) @router.get("", response_model=ResponseData) async def list_users( page: Annotated[int, Query(ge=1)] = 1, page_size: Annotated[int, Query(ge=1, le=100)] = 10, current_user: Annotated[User, Depends(get_current_user)] = None, db: Session = Depends(get_sqlite_db) ): """获取用户列表 (仅管理员)""" if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required" ) offset = (page - 1) * page_size users = db.query(User).offset(offset).limit(page_size).all() total = db.query(User).count() return ResponseData( code=0, message="success", data={ "users": [ { "id": u.id, "username": u.username, "email": u.email, "role": u.role, "is_active": u.is_active, "created_at": u.created_at.isoformat() } for u in users ], "total": total, "page": page, "page_size": page_size } ) @router.get("/{user_id}", response_model=ResponseData) async def get_user( user_id: int, current_user: Annotated[User, Depends(get_current_user)], db: Session = Depends(get_sqlite_db) ): """获取用户详情 (仅管理员)""" if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return ResponseData( code=0, message="success", data={ "id": user.id, "username": user.username, "email": user.email, "role": user.role, "is_active": user.is_active, "created_at": user.created_at.isoformat() } ) @router.put("/{user_id}/status", response_model=ResponseData) async def update_user_status( user_id: int, is_active: bool, current_user: Annotated[User, Depends(get_current_user)], db: Session = Depends(get_sqlite_db) ): """更新用户状态 (仅管理员)""" if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_active = is_active from datetime import datetime user.updated_at = datetime.utcnow() db.commit() return ResponseData( code=0, message="success", data={ "id": user.id, "is_active": user.is_active } )