"""测试服务 - 对应Go的internal/service/test.go""" import asyncio import json from datetime import datetime, timedelta from typing import List, Optional from threading import RLock import httpx import websockets import pandas as pd from app.models import ( APITestListData, APITestCategory, APITestCase, APITestRequest, APITestResult, WSTestListData, WSTestCase, WSTestRequest, WSTestResult, WSMessage, TestHistoryRequest, TestHistoryData ) from app.core.logger import info, error class TestService: """测试服务""" def __init__(self): self.lock = RLock() self.api_history: List[APITestResult] = [] self.internal_history: List[APITestResult] = [] self.ws_history: List[WSTestResult] = [] self.history_size = 100 def get_api_test_list(self) -> APITestListData: """获取API测试列表(对外接口)""" # 固定交易时间:2026年3月2日到2026年3月6日 test_start = datetime(2026, 3, 2) test_end = datetime(2026, 3, 6) categories = [ APITestCategory( name="【对外】股票数据接口", items=[ APITestCase( id="stock_klines", name="查询股票K线", method="GET", path="/v1/stock/klines/{symbol}", description="查询指定股票的K线数据", params={ "symbol": "000001.SZ", "start": test_start.strftime("%Y%m%d"), "end": test_end.strftime("%Y%m%d"), "freq": "1d", "adjust": "qfq" } ), APITestCase( id="stock_symbols", name="查询股票列表", method="GET", path="/v1/stock/symbols", description="获取所有可用股票标的", params={"page": "1", "size": "20"} ), APITestCase( id="stock_batch", name="批量查询股票K线", method="POST", path="/v1/stock/klines/batch", description="批量查询多只股票K线", body={ "symbols": ["000001.SZ", "000002.SZ"], "start": test_start.strftime("%Y%m%d"), "end": test_end.strftime("%Y%m%d"), "freq": "1d" } ), APITestCase( id="stock_calendar", name="查询股票交易日历", method="GET", path="/v1/stock/trading-dates", description="查询股票交易日历", params={ "start": test_start.strftime("%Y%m%d"), "end": test_end.strftime("%Y%m%d") } ), ] ), APITestCategory( name="【对外】期货数据接口", items=[ APITestCase( id="futures_klines", name="查询期货K线", method="GET", path="/v1/futures/klines/{symbol}", description="查询指定期货合约的K线数据", params={ "symbol": "CU2504.SHFE", "start": test_start.strftime("%Y%m%d"), "end": test_end.strftime("%Y%m%d"), "freq": "1d" } ), APITestCase( id="futures_symbols", name="查询期货列表", method="GET", path="/v1/futures/symbols", description="获取所有可用期货标的", params={"page": "1", "size": "20"} ), APITestCase( id="futures_batch", name="批量查询期货K线", method="POST", path="/v1/futures/klines/batch", description="批量查询多个期货合约K线", body={ "symbols": ["CU2504.SHFE", "RB2505.SHFE"], "start": test_start.strftime("%Y%m%d"), "end": test_end.strftime("%Y%m%d"), "freq": "1d" } ), APITestCase( id="futures_contracts", name="查询合约列表", method="GET", path="/v1/futures/contracts", description="根据品种查询可交易合约", params={"underlying": "CU", "exchange": "SHFE"} ), APITestCase( id="futures_calendar", name="查询期货交易日历", method="GET", path="/v1/futures/trading-dates", description="查询期货交易日历", params={ "start": test_start.strftime("%Y%m%d"), "end": test_end.strftime("%Y%m%d") } ), ] ), APITestCategory( name="【对外】管理接口", items=[ APITestCase( id="admin_health", name="健康检查", method="GET", path="/v1/admin/health", description="检查服务健康状态", params={} ), APITestCase( id="admin_source_status", name="数据源状态", method="GET", path="/v1/admin/source/status", description="获取当前数据源状态", params={} ), APITestCase( id="admin_source_switch", name="切换数据源", method="POST", path="/v1/admin/source/switch", description="切换到指定数据源(amazingdata)", body={ "asset_class": "all", "source": "amazingdata", "sync_backfill": False } ), APITestCase( id="admin_system_status", name="系统状态", method="GET", path="/v1/admin/system/status", description="获取系统运行状态和资源使用情况", params={} ), ] ), APITestCategory( name="【对外】数据同步接口", items=[ APITestCase( id="admin_data_sync_full", name="全量数据同步", method="POST", path="/v1/admin/data/sync", description="手动触发全量数据同步(基础+行情+财务)", body={ "symbols": ["000001.SZ", "600519.SH"], "sync_type": "full", "start_date": "20240301", "end_date": "20240310", "asset_class": "stock" } ), APITestCase( id="admin_data_sync_base", name="同步基础K线数据", method="POST", path="/v1/admin/data/sync", description="仅同步OHLCV基础数据", body={ "symbols": ["000001.SZ"], "sync_type": "base", "freq": "1d", "start_date": "20240301", "end_date": "20240310", "asset_class": "stock" } ), APITestCase( id="admin_data_sync_quote", name="同步行情指标数据", method="POST", path="/v1/admin/data/sync", description="同步均线/MACD/涨跌幅", body={ "symbols": ["000001.SZ"], "sync_type": "quote", "start_date": "20240301", "end_date": "20240310", "asset_class": "stock" } ), APITestCase( id="admin_data_sync_finance", name="同步财务数据", method="POST", path="/v1/admin/data/sync", description="同步市值/股本/利润", body={ "symbols": ["000001.SZ"], "sync_type": "finance", "start_date": "20240301", "end_date": "20240310", "asset_class": "stock" } ), APITestCase( id="admin_data_sync_incremental", name="增量数据同步", method="POST", path="/v1/admin/data/sync/incremental", description="触发增量同步(最近30天)", body=["000001.SZ", "600519.SH"] ), ] ), ] return APITestListData(categories=categories, base_url="") def get_internal_test_list(self) -> APITestListData: """获取内部接口测试列表(对内接口 - SDK封装层)""" categories = [ APITestCategory( name="【对内】市场数据接口 (_market_data)", items=[ APITestCase( id="internal_market_query_kline", name="SDK: query_kline", method="INTERNAL", path="AmazingDataAdapter._internal.market.query_kline", description="查询K线数据(内部SDK调用)", params={"symbol": "000001.SZ", "period": "1d"} ), APITestCase( id="internal_market_query_snapshot", name="SDK: query_snapshot", method="INTERNAL", path="AmazingDataAdapter._internal.market.query_snapshot", description="查询快照数据(内部SDK调用)", params={"symbol": "000001.SZ"} ), ] ), APITestCategory( name="【对内】基础数据接口 (_base_data)", items=[ APITestCase( id="internal_base_get_code_list", name="SDK: get_code_list", method="INTERNAL", path="AmazingDataAdapter._internal.base.get_code_list", description="获取股票代码列表(内部SDK调用)", params={} ), APITestCase( id="internal_base_get_future_code_list", name="SDK: get_future_code_list", method="INTERNAL", path="AmazingDataAdapter._internal.base.get_future_code_list", description="获取期货代码列表(内部SDK调用)", params={} ), APITestCase( id="internal_base_get_code_info", name="SDK: get_code_info", method="INTERNAL", path="AmazingDataAdapter._internal.base.get_code_info", description="获取代码信息(内部SDK调用)", params={} ), APITestCase( id="internal_base_get_calendar", name="SDK: get_calendar", method="INTERNAL", path="AmazingDataAdapter._internal.base.get_calendar", description="获取交易日历(内部SDK调用)", params={} ), APITestCase( id="internal_base_get_adj_factor", name="SDK: get_adj_factor", method="INTERNAL", path="AmazingDataAdapter._internal.base.get_adj_factor", description="获取复权因子(内部SDK调用)", params={} ), APITestCase( id="internal_base_get_etf_pcf", name="SDK: get_etf_pcf", method="INTERNAL", path="AmazingDataAdapter._internal.base.get_etf_pcf", description="获取ETF申赎数据(内部SDK调用)", params={} ), ] ), APITestCategory( name="【对内】股本股东接口 (_info_data)", items=[ APITestCase( id="internal_info_get_equity_structure", name="SDK: get_equity_structure", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_equity_structure", description="获取股本结构(内部SDK调用)", params={"symbol": "000001.SZ"} ), APITestCase( id="internal_info_get_share_holder", name="SDK: get_share_holder", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_share_holder", description="获取股东数据(内部SDK调用)", params={"symbol": "000001.SZ"} ), APITestCase( id="internal_info_get_holder_num", name="SDK: get_holder_num", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_holder_num", description="获取股东户数(内部SDK调用)", params={"symbol": "000001.SZ"} ), ] ), APITestCategory( name="【对内】财务报表接口 (_info_data)", items=[ APITestCase( id="internal_info_get_income", name="SDK: get_income", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_income", description="获取利润表(内部SDK调用)", params={"symbol": "000001.SZ"} ), APITestCase( id="internal_info_get_balance_sheet", name="SDK: get_balance_sheet", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_balance_sheet", description="获取资产负债表(内部SDK调用)", params={"symbol": "000001.SZ"} ), APITestCase( id="internal_info_get_cash_flow", name="SDK: get_cash_flow", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_cash_flow", description="获取现金流量表(内部SDK调用)", params={"symbol": "000001.SZ"} ), ] ), APITestCategory( name="【对内】市场状态接口 (_info_data)", items=[ APITestCase( id="internal_info_get_history_stock_status", name="SDK: get_history_stock_status", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_history_stock_status", description="获取历史股票状态(涨停/跌停/ST/停牌)(内部SDK调用)", params={"symbol": "000001.SZ"} ), APITestCase( id="internal_info_get_margin_summary", name="SDK: get_margin_summary", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_margin_summary", description="获取融资融券汇总(内部SDK调用)", params={} ), APITestCase( id="internal_info_get_margin_detail", name="SDK: get_margin_detail", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_margin_detail", description="获取融资融券明细(内部SDK调用)", params={"symbol": "000001.SZ"} ), ] ), APITestCategory( name="【对内】特色数据接口 (_info_data)", items=[ APITestCase( id="internal_info_get_long_hu_bang", name="SDK: get_long_hu_bang", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_long_hu_bang", description="获取龙虎榜数据(内部SDK调用)", params={} ), APITestCase( id="internal_info_get_block_trading", name="SDK: get_block_trading", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_block_trading", description="获取大宗交易数据(内部SDK调用)", params={} ), APITestCase( id="internal_info_get_index_constituent", name="SDK: get_index_constituent", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_index_constituent", description="获取指数成分股(内部SDK调用)", params={"index": "000300.SH"} ), APITestCase( id="internal_info_get_index_weight", name="SDK: get_index_weight", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_index_weight", description="获取指数权重(内部SDK调用)", params={"index": "000300.SH"} ), ] ), APITestCategory( name="【对内】基金可转债接口 (_info_data)", items=[ APITestCase( id="internal_info_get_fund_share", name="SDK: get_fund_share", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_fund_share", description="获取基金份额(内部SDK调用)", params={} ), APITestCase( id="internal_info_get_kzz_issuance", name="SDK: get_kzz_issuance", method="INTERNAL", path="AmazingDataAdapter._internal.info.get_kzz_issuance", description="获取可转债发行数据(内部SDK调用)", params={} ), ] ), ] return APITestListData(categories=categories, base_url="") async def run_api_test(self, base_url: str, req: APITestRequest) -> APITestResult: """执行对外API测试""" # 获取测试用例 test_list = self.get_api_test_list() test_case = None for cat in test_list.categories: for item in cat.items: if item.id == req.id: test_case = item break if test_case: break if not test_case: raise ValueError(f"Test case not found: {req.id}") # 合并参数 params = dict(test_case.params) if req.params: params.update(req.params) # 构建URL url = base_url + test_case.path for k, v in params.items(): url = url.replace(f"{{{k}}}", str(v)) # 添加查询参数 if test_case.method == "GET" and params: query_parts = [] for k, v in params.items(): if f"{{{k}}}" not in test_case.path: query_parts.append(f"{k}={v}") if query_parts: url += "?" + "&".join(query_parts) # 准备请求体 body = req.body if req.body is not None else test_case.body # 执行请求 start_time = datetime.now() async with httpx.AsyncClient() as client: try: headers = {"X-API-Key": "test-api-key"} if test_case.method == "GET": response = await client.get(url, headers=headers, timeout=30) elif test_case.method == "POST": response = await client.post( url, json=body, headers=headers, timeout=30 ) else: raise ValueError(f"Unsupported method: {test_case.method}") latency = int((datetime.now() - start_time).total_seconds() * 1000) result = APITestResult( id=int(datetime.now().timestamp()), case_id=req.id, name=test_case.name, success=200 <= response.status_code < 300, status_code=response.status_code, latency=latency, request={ "method": test_case.method, "url": url, "body": body }, response=response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text, timestamp=datetime.now() ) self._add_api_history(result) return result except Exception as e: latency = int((datetime.now() - start_time).total_seconds() * 1000) result = APITestResult( id=int(datetime.now().timestamp()), case_id=req.id, name=test_case.name, success=False, latency=latency, request={ "method": test_case.method, "url": url, "body": body }, error=str(e), timestamp=datetime.now() ) self._add_api_history(result) return result async def run_internal_test(self, adapter, req: APITestRequest) -> APITestResult: """执行内部接口测试(SDK封装层)""" from app.adapters.amazingdata_adapter import AmazingDataAdapter start_time = datetime.now() try: # 确保已连接 if not adapter._is_logged_in: raise RuntimeError("Adapter not connected") # 根据测试ID调用对应的内部接口 result_data = None error_msg = None if req.id == "internal_market_query_kline": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.market.query_kline( code_list=[symbol], begin_date=20240301, end_date=20240310, period=10000 # SDK只支持10000(日线) ) elif req.id == "internal_market_query_snapshot": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.market.query_snapshot( code_list=[symbol], begin_date=20240301, end_date=20240301 ) elif req.id == "internal_base_get_code_list": from app.adapters.amazingdata_adapter import SecurityType result_data = adapter._internal.base.get_code_list( security_type=SecurityType.STOCK_A.value ) elif req.id == "internal_base_get_future_code_list": from app.adapters.amazingdata_adapter import SecurityType result_data = adapter._internal.base.get_future_code_list( security_type=SecurityType.FUTURE.value ) elif req.id == "internal_base_get_code_info": from app.adapters.amazingdata_adapter import SecurityType result_data = adapter._internal.base.get_code_info( security_type=SecurityType.STOCK_A.value ) elif req.id == "internal_base_get_calendar": result_data = adapter._internal.base.get_calendar(market="SH") elif req.id == "internal_base_get_adj_factor": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.base.get_adj_factor( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_base_get_etf_pcf": result_data = adapter._internal.base.get_etf_pcf( code_list=["510050.SH"] ) elif req.id == "internal_info_get_equity_structure": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_equity_structure( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_share_holder": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_share_holder( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_holder_num": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_holder_num( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_income": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_income( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_balance_sheet": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_balance_sheet( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_cash_flow": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_cash_flow( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_history_stock_status": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_history_stock_status( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache, begin_date=20240301, end_date=20240310 ) elif req.id == "internal_info_get_margin_summary": result_data = adapter._internal.info.get_margin_summary( local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_margin_detail": symbol = req.params.get("symbol", "000001.SZ") result_data = adapter._internal.info.get_margin_detail( code_list=[symbol], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_long_hu_bang": result_data = adapter._internal.info.get_long_hu_bang( code_list=["000001.SZ"], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_block_trading": result_data = adapter._internal.info.get_block_trading( code_list=["000001.SZ"], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_index_constituent": index_code = req.params.get("index", "000300.SH") result_data = adapter._internal.info.get_index_constituent( code_list=[index_code], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_index_weight": index_code = req.params.get("index", "000300.SH") result_data = adapter._internal.info.get_index_weight( code_list=[index_code], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_fund_share": result_data = adapter._internal.info.get_fund_share( code_list=["510050.SH"], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) elif req.id == "internal_info_get_kzz_issuance": result_data = adapter._internal.info.get_kzz_issuance( code_list=["110043.SH"], local_path=adapter.config.local_path, is_local=adapter.config.use_local_cache ) else: raise ValueError(f"Unknown internal test case: {req.id}") latency = int((datetime.now() - start_time).total_seconds() * 1000) # 检查结果 has_data = result_data is not None if isinstance(result_data, (dict, list)): has_data = len(result_data) > 0 elif isinstance(result_data, pd.DataFrame): has_data = not result_data.empty result = APITestResult( id=int(datetime.now().timestamp()), case_id=req.id, name=req.id.replace("internal_", "").replace("_", ":"), success=has_data, status_code=200 if has_data else 204, latency=latency, request={"params": req.params}, response={"data_count": len(result_data) if hasattr(result_data, '__len__') else 1} if has_data else None, error=None if has_data else "No data returned", timestamp=datetime.now() ) self._add_internal_history(result) return result except Exception as e: latency = int((datetime.now() - start_time).total_seconds() * 1000) result = APITestResult( id=int(datetime.now().timestamp()), case_id=req.id, name=req.id.replace("internal_", "").replace("_", ":"), success=False, latency=latency, request={"params": req.params}, error=str(e), timestamp=datetime.now() ) self._add_internal_history(result) return result def get_ws_test_list(self) -> WSTestListData: """获取WebSocket测试列表""" cases = [ WSTestCase( id="ws_subscribe_stock", name="订阅股票行情", description="订阅单只股票实时行情", action="subscribe", symbols=["000001.SZ"] ), WSTestCase( id="ws_subscribe_futures", name="订阅期货行情", description="订阅单个期货合约实时行情", action="subscribe", symbols=["CU2504.SHFE"] ), WSTestCase( id="ws_subscribe_multi", name="批量订阅", description="同时订阅多个标的", action="subscribe", symbols=["000001.SZ", "000002.SZ", "CU2504.SHFE"] ), ] return WSTestListData(cases=cases, ws_url="") async def run_ws_test(self, ws_url: str, req: WSTestRequest) -> WSTestResult: """执行WebSocket测试""" test_list = self.get_ws_test_list() test_case = None for item in test_list.cases: if item.id == req.id: test_case = item break if not test_case: raise ValueError(f"Test case not found: {req.id}") symbols = req.symbols if req.symbols else test_case.symbols result = WSTestResult( id=f"ws_{int(datetime.now().timestamp())}", case_id=req.id, timestamp=datetime.now(), messages=[] ) start_time = datetime.now() try: async with websockets.connect( ws_url, extra_headers={"X-API-Key": "test-api-key"} ) as ws: result.latency = int((datetime.now() - start_time).total_seconds() * 1000) result.success = True msg = { "action": test_case.action, "symbols": symbols } await ws.send(json.dumps(msg)) for _ in range(3): try: msg_data = await asyncio.wait_for(ws.recv(), timeout=5) result.messages.append(WSMessage( type="received", data=json.loads(msg_data), timestamp=datetime.now() )) except asyncio.TimeoutError: break except Exception as e: result.latency = int((datetime.now() - start_time).total_seconds() * 1000) result.success = False result.error = str(e) self._add_ws_history(result) return result def get_test_history(self, req: TestHistoryRequest) -> TestHistoryData: """获取测试历史""" with self.lock: limit = req.limit or 20 api_tests = [] ws_tests = [] if not req.type or req.type == "api": api_tests = self.api_history[-limit:] if not req.type or req.type == "ws": ws_tests = self.ws_history[-limit:] return TestHistoryData(api_tests=api_tests, ws_tests=ws_tests) def _add_api_history(self, result: APITestResult): """添加API测试历史""" with self.lock: self.api_history.append(result) if len(self.api_history) > self.history_size: self.api_history = self.api_history[-self.history_size:] def _add_internal_history(self, result: APITestResult): """添加内部接口测试历史""" with self.lock: self.internal_history.append(result) if len(self.internal_history) > self.history_size: self.internal_history = self.internal_history[-self.history_size:] def _add_ws_history(self, result: WSTestResult): """添加WebSocket测试历史""" with self.lock: self.ws_history.append(result) if len(self.ws_history) > self.history_size: self.ws_history = self.ws_history[-self.history_size:]