""" 异常处理模块 """ from fastapi import Request from fastapi.responses import JSONResponse from fastapi.exceptions import RequestValidationError from sqlalchemy.exc import SQLAlchemyError class BusinessException(Exception): """业务异常""" def __init__(self, code: int, message: str): self.code = code self.message = message super().__init__(message) class SDKException(Exception): """SDK异常""" def __init__(self, message: str, original_error: Exception = None): self.message = message self.original_error = original_error super().__init__(message) async def business_exception_handler(request: Request, exc: BusinessException): """业务异常处理器""" return JSONResponse( status_code=200, content={ "code": exc.code, "message": exc.message, "data": None, "timestamp": datetime.utcnow().isoformat() } ) async def validation_exception_handler(request: Request, exc: RequestValidationError): """参数验证异常处理器""" errors = [] for error in exc.errors(): errors.append(f"{'.'.join(str(x) for x in error['loc'])}: {error['msg']}") return JSONResponse( status_code=400, content={ "code": 400, "message": f"参数错误: {'; '.join(errors)}", "data": None, "timestamp": datetime.utcnow().isoformat() } ) async def sqlalchemy_exception_handler(request: Request, exc: SQLAlchemyError): """数据库异常处理器""" return JSONResponse( status_code=500, content={ "code": 500, "message": f"数据库错误: {str(exc)}", "data": None, "timestamp": datetime.utcnow().isoformat() } ) async def general_exception_handler(request: Request, exc: Exception): """通用异常处理器""" return JSONResponse( status_code=500, content={ "code": 500, "message": f"服务器内部错误: {str(exc)}", "data": None, "timestamp": datetime.utcnow().isoformat() } ) from datetime import datetime