""" 测试中心路由 """ from typing import List from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from app.db.session import get_db from app.schemas.base import ResponseModel from app.schemas.test import TestRequest, RunAllTestsRequest from app.services.test_service import TestService from app.core.security import get_current_user from app.models.user import User router = APIRouter() @router.get("/categories", response_model=ResponseModel) async def get_test_categories( db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取测试分类""" service = TestService(db) categories = service.get_categories() return ResponseModel(data=categories) @router.get("/endpoints", response_model=ResponseModel) async def get_test_endpoints( category: str = Query(None, description="分类筛选"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取测试端点列表""" service = TestService(db) endpoints = service.get_endpoints(category) return ResponseModel(data=endpoints) @router.post("/run", response_model=ResponseModel) async def run_single_test( request: TestRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """执行单个接口测试""" service = TestService(db) result = service.run_test(request.endpoint, request.method, request.params or {}) return ResponseModel(data=result) @router.post("/run-all", response_model=ResponseModel) async def run_all_tests( request: RunAllTestsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """执行全部接口测试""" service = TestService(db) result = service.run_all_tests(request.categories) return ResponseModel(data=result) @router.get("/history", response_model=ResponseModel) async def get_test_history( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取测试历史记录""" service = TestService(db) result = service.get_test_history(page, page_size) return ResponseModel(data={ "items": [{ "id": log.id, "test_name": log.test_name, "api_category": log.api_category, "api_endpoint": log.api_endpoint, "request_method": log.request_method, "status_code": log.status_code, "execution_time_ms": log.execution_time_ms, "is_success": log.is_success, "error_message": log.error_message, "created_at": log.created_at.isoformat() if log.created_at else None } for log in result["items"]], "total": result["total"], "page": result["page"], "page_size": result["page_size"], "total_pages": result["total_pages"] }) @router.get("/history/{test_id}", response_model=ResponseModel) async def get_test_detail( test_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取单次测试详情""" from app.models.test import APITestLog log = db.query(APITestLog).filter(APITestLog.id == test_id).first() if not log: return ResponseModel(code=404, message="测试记录不存在") return ResponseModel(data={ "id": log.id, "test_name": log.test_name, "api_category": log.api_category, "api_endpoint": log.api_endpoint, "request_method": log.request_method, "request_params": log.request_params, "response_data": log.response_data, "status_code": log.status_code, "execution_time_ms": log.execution_time_ms, "is_success": log.is_success, "error_message": log.error_message, "created_at": log.created_at.isoformat() if log.created_at else None })