You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
5.9 KiB

"""
用户管理 API 路由
"""
from typing import Annotated, List
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.schemas import (
UserCreate,
UserResponse,
UserUpdate,
ResponseData,
PageParams
)
from app.services.auth_service import get_password_hash
from app.api.v1.auth import get_current_user
from app.models import User
from app.db.init_db import get_sqlite_db
router = APIRouter()
@router.post("", response_model=ResponseData)
async def create_user(
request: UserCreate,
db: Session = Depends(get_sqlite_db)
):
"""
创建新用户
- **username**: 用户名
- **password**: 密码
- **email**: 邮箱 (可选)
"""
# 检查用户名是否已存在
existing = db.query(User).filter(User.username == request.username).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# 创建用户
user = User(
username=request.username,
password_hash=get_password_hash(request.password),
email=request.email,
role="user"
)
db.add(user)
db.commit()
db.refresh(user)
return ResponseData(
code=0,
message="success",
data={
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role,
"created_at": user.created_at.isoformat()
}
)
@router.get("/me", response_model=ResponseData)
async def get_current_user_info(
current_user: Annotated[User, Depends(get_current_user)]
):
"""获取当前用户信息"""
return ResponseData(
code=0,
message="success",
data={
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"role": current_user.role,
"is_active": current_user.is_active,
"created_at": current_user.created_at.isoformat()
}
)
@router.put("/me", response_model=ResponseData)
async def update_current_user(
request: UserUpdate,
current_user: Annotated[User, Depends(get_current_user)],
db: Session = Depends(get_sqlite_db)
):
"""更新当前用户信息"""
if request.email is not None:
current_user.email = request.email
if request.password is not None:
current_user.password_hash = get_password_hash(request.password)
from datetime import datetime
current_user.updated_at = datetime.utcnow()
db.commit()
return ResponseData(
code=0,
message="success",
data={
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"role": current_user.role
}
)
@router.get("", response_model=ResponseData)
async def list_users(
page: Annotated[int, Query(ge=1)] = 1,
page_size: Annotated[int, Query(ge=1, le=100)] = 10,
current_user: Annotated[User, Depends(get_current_user)] = None,
db: Session = Depends(get_sqlite_db)
):
"""获取用户列表 (仅管理员)"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
offset = (page - 1) * page_size
users = db.query(User).offset(offset).limit(page_size).all()
total = db.query(User).count()
return ResponseData(
code=0,
message="success",
data={
"users": [
{
"id": u.id,
"username": u.username,
"email": u.email,
"role": u.role,
"is_active": u.is_active,
"created_at": u.created_at.isoformat()
}
for u in users
],
"total": total,
"page": page,
"page_size": page_size
}
)
@router.get("/{user_id}", response_model=ResponseData)
async def get_user(
user_id: int,
current_user: Annotated[User, Depends(get_current_user)],
db: Session = Depends(get_sqlite_db)
):
"""获取用户详情 (仅管理员)"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return ResponseData(
code=0,
message="success",
data={
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role,
"is_active": user.is_active,
"created_at": user.created_at.isoformat()
}
)
@router.put("/{user_id}/status", response_model=ResponseData)
async def update_user_status(
user_id: int,
is_active: bool,
current_user: Annotated[User, Depends(get_current_user)],
db: Session = Depends(get_sqlite_db)
):
"""更新用户状态 (仅管理员)"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user.is_active = is_active
from datetime import datetime
user.updated_at = datetime.utcnow()
db.commit()
return ResponseData(
code=0,
message="success",
data={
"id": user.id,
"is_active": user.is_active
}
)