""" 测试中心服务 """ import time import logging from typing import List, Dict, Optional from datetime import date, datetime, timedelta from sqlalchemy.orm import Session from app.models.test import APITestLog from app.services.base_data_service import BaseDataService from app.services.stock_service import StockService from app.services.future_service import FutureService from app.services.finance_service import FinanceService logger = logging.getLogger(__name__) class TestService: """测试服务""" # 测试端点定义 TEST_ENDPOINTS = [ # 基础数据 {"category": "base_data", "name": "获取代码列表", "endpoint": "/api/v1/base/codes", "method": "GET", "params": {"security_type": "EXTRA_STOCK_A"}}, {"category": "base_data", "name": "获取交易日历", "endpoint": "/api/v1/base/calendar", "method": "GET", "params": {"market": "SH", "start_date": "20240101", "end_date": "20241231"}}, # 股票数据 {"category": "stock", "name": "获取股票K线", "endpoint": "/api/v1/stock/kline", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240101", "end_date": "20241231", "period": "daily"}}, {"category": "stock", "name": "获取股票K线图", "endpoint": "/api/v1/stock/kline/000001.SZ/chart", "method": "GET", "params": {"start_date": "20240101", "end_date": "20241231", "period": "daily"}}, # 期货数据 {"category": "future", "name": "获取期货K线", "endpoint": "/api/v1/future/kline", "method": "GET", "params": {"codes": "IF2501.CFE", "start_date": "20240101", "end_date": "20241231", "period": "daily"}}, {"category": "future", "name": "获取期货K线图", "endpoint": "/api/v1/future/kline/IF2501.CFE/chart", "method": "GET", "params": {"start_date": "20240101", "end_date": "20241231", "period": "daily"}}, # 财务数据 {"category": "finance", "name": "获取资产负债表", "endpoint": "/api/v1/finance/balance-sheet", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240930", "end_date": "20240930"}}, {"category": "finance", "name": "获取现金流量表", "endpoint": "/api/v1/finance/cash-flow", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240930", "end_date": "20240930"}}, {"category": "finance", "name": "获取利润表", "endpoint": "/api/v1/finance/income", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240930", "end_date": "20240930"}}, # 实时数据 {"category": "realtime", "name": "获取最新快照", "endpoint": "/api/v1/realtime/snapshot", "method": "GET", "params": {"codes": "000001.SZ"}}, # 缓存管理 {"category": "cache", "name": "获取缓存任务列表", "endpoint": "/api/v1/cache/tasks", "method": "GET", "params": {"page": 1, "page_size": 20}}, ] def __init__(self, db: Session): self.db = db self.base_service = BaseDataService(db) self.stock_service = StockService(db) self.future_service = FutureService(db) self.finance_service = FinanceService(db) def get_categories(self) -> List[Dict]: """获取测试分类""" categories = set(endpoint["category"] for endpoint in self.TEST_ENDPOINTS) category_names = { "base_data": "基础数据", "stock": "股票数据", "future": "期货数据", "realtime": "实时数据", "finance": "财务数据", "shareholder": "股东数据", "margin": "融资融券", "index": "指数数据", "etf": "ETF数据", "kzz": "可转债数据", "cache": "缓存管理" } return [ {"key": cat, "name": category_names.get(cat, cat)} for cat in sorted(categories) ] def get_endpoints(self, category: str = None) -> List[Dict]: """获取测试端点列表""" if category: return [ep for ep in self.TEST_ENDPOINTS if ep["category"] == category] return self.TEST_ENDPOINTS def run_test(self, endpoint: str, method: str, params: dict) -> Dict: """ 运行单个测试 Args: endpoint: 端点路径 method: HTTP方法 params: 请求参数 Returns: 测试结果 """ start_time = time.time() try: # 根据端点调用相应的服务方法 result = self._call_endpoint(endpoint, method, params) execution_time = int((time.time() - start_time) * 1000) return { "success": True, "endpoint": endpoint, "method": method, "status_code": 200, "execution_time_ms": execution_time, "response_data": result, "error_message": None } except Exception as e: execution_time = int((time.time() - start_time) * 1000) logger.error(f"测试失败 {endpoint}: {str(e)}") return { "success": False, "endpoint": endpoint, "method": method, "status_code": 500, "execution_time_ms": execution_time, "response_data": None, "error_message": str(e) } def _call_endpoint(self, endpoint: str, method: str, params: dict): """调用端点对应的服务方法""" from app.utils.date_utils import parse_date # 股票数据接口 if "/stock/kline" in endpoint and "/chart" in endpoint: code = endpoint.split("/")[-2] start_date = parse_date(params.get("start_date", "20240101")) end_date = parse_date(params.get("end_date", "20241231")) period = params.get("period", "daily") return self.stock_service.get_kline_chart_data(code, start_date, end_date, period) elif "/stock/kline" in endpoint: codes = params.get("codes", "").split(",") start_date = parse_date(params.get("start_date", "20240101")) end_date = parse_date(params.get("end_date", "20241231")) period = params.get("period", "daily") return self.stock_service.get_kline(codes, start_date, end_date, period) # 期货数据接口 elif "/future/kline" in endpoint and "/chart" in endpoint: code = endpoint.split("/")[-2] start_date = parse_date(params.get("start_date", "20240101")) end_date = parse_date(params.get("end_date", "20241231")) period = params.get("period", "daily") return self.future_service.get_kline_chart_data(code, start_date, end_date, period) elif "/future/kline" in endpoint: codes = params.get("codes", "").split(",") start_date = parse_date(params.get("start_date", "20240101")) end_date = parse_date(params.get("end_date", "20241231")) period = params.get("period", "daily") return self.future_service.get_kline(codes, start_date, end_date, period) # 财务数据接口 elif "/finance/balance-sheet" in endpoint: codes = params.get("codes", "").split(",") start_date = parse_date(params.get("start_date", "20240930")) end_date = parse_date(params.get("end_date", "20240930")) return self.finance_service.get_balance_sheet(codes, start_date, end_date) elif "/finance/cash-flow" in endpoint: codes = params.get("codes", "").split(",") start_date = parse_date(params.get("start_date", "20240930")) end_date = parse_date(params.get("end_date", "20240930")) return self.finance_service.get_cash_flow(codes, start_date, end_date) elif "/finance/income" in endpoint: codes = params.get("codes", "").split(",") start_date = parse_date(params.get("start_date", "20240930")) end_date = parse_date(params.get("end_date", "20240930")) return self.finance_service.get_income_statement(codes, start_date, end_date) # 基础数据接口 elif "/base/codes" in endpoint: security_type = params.get("security_type", "EXTRA_STOCK_A") return {"codes": self.base_service.get_code_list(security_type)[:10]} elif "/base/calendar" in endpoint: market = params.get("market", "SH") start_date = parse_date(params.get("start_date", "20240101")) end_date = parse_date(params.get("end_date", "20241231")) calendar = self.base_service.get_trading_calendar(market, start_date, end_date) return {"calendar": [d.isoformat() for d in calendar[:10]]} # 其他接口 else: return {"message": "测试通过"} def run_all_tests(self, categories: List[str] = None) -> Dict: """ 运行全部测试 Args: categories: 测试分类列表,None表示全部 Returns: 测试结果汇总 """ endpoints = self.get_endpoints() if categories: endpoints = [ep for ep in endpoints if ep["category"] in categories] results = [] passed = 0 failed = 0 for endpoint in endpoints: result = self.run_test( endpoint["endpoint"], endpoint["method"], endpoint.get("params", {}) ) results.append(result) if result["success"]: passed += 1 else: failed += 1 return { "total": len(results), "passed": passed, "failed": failed, "results": results } def log_test(self, test_name: str, api_category: str, api_endpoint: str, request_method: str, request_params: dict, response_data: dict, status_code: int, execution_time_ms: int, is_success: bool, error_message: str = None): """记录测试日志""" log = APITestLog( test_name=test_name, api_category=api_category, api_endpoint=api_endpoint, request_method=request_method, request_params=request_params, response_data=response_data, status_code=status_code, execution_time_ms=execution_time_ms, is_success=is_success, error_message=error_message ) self.db.add(log) self.db.commit() def get_test_history(self, page: int = 1, page_size: int = 20) -> Dict: """获取测试历史""" query = self.db.query(APITestLog).order_by(APITestLog.created_at.desc()) total = query.count() logs = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": logs, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size }