You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
11 KiB

"""
测试中心服务
"""
import time
import logging
from typing import List, Dict, Optional
from datetime import date, datetime, timedelta
from sqlalchemy.orm import Session
from app.models.test import APITestLog
from app.services.base_data_service import BaseDataService
from app.services.stock_service import StockService
from app.services.future_service import FutureService
from app.services.finance_service import FinanceService
logger = logging.getLogger(__name__)
class TestService:
"""测试服务"""
# 测试端点定义
TEST_ENDPOINTS = [
# 基础数据
{"category": "base_data", "name": "获取代码列表", "endpoint": "/api/v1/base/codes", "method": "GET", "params": {"security_type": "EXTRA_STOCK_A"}},
{"category": "base_data", "name": "获取交易日历", "endpoint": "/api/v1/base/calendar", "method": "GET", "params": {"market": "SH", "start_date": "20240101", "end_date": "20241231"}},
# 股票数据
{"category": "stock", "name": "获取股票K线", "endpoint": "/api/v1/stock/kline", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240101", "end_date": "20241231", "period": "daily"}},
{"category": "stock", "name": "获取股票K线图", "endpoint": "/api/v1/stock/kline/000001.SZ/chart", "method": "GET", "params": {"start_date": "20240101", "end_date": "20241231", "period": "daily"}},
# 期货数据
{"category": "future", "name": "获取期货K线", "endpoint": "/api/v1/future/kline", "method": "GET", "params": {"codes": "IF2501.CFE", "start_date": "20240101", "end_date": "20241231", "period": "daily"}},
{"category": "future", "name": "获取期货K线图", "endpoint": "/api/v1/future/kline/IF2501.CFE/chart", "method": "GET", "params": {"start_date": "20240101", "end_date": "20241231", "period": "daily"}},
# 财务数据
{"category": "finance", "name": "获取资产负债表", "endpoint": "/api/v1/finance/balance-sheet", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240930", "end_date": "20240930"}},
{"category": "finance", "name": "获取现金流量表", "endpoint": "/api/v1/finance/cash-flow", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240930", "end_date": "20240930"}},
{"category": "finance", "name": "获取利润表", "endpoint": "/api/v1/finance/income", "method": "GET", "params": {"codes": "000001.SZ", "start_date": "20240930", "end_date": "20240930"}},
# 实时数据
{"category": "realtime", "name": "获取最新快照", "endpoint": "/api/v1/realtime/snapshot", "method": "GET", "params": {"codes": "000001.SZ"}},
# 缓存管理
{"category": "cache", "name": "获取缓存任务列表", "endpoint": "/api/v1/cache/tasks", "method": "GET", "params": {"page": 1, "page_size": 20}},
]
def __init__(self, db: Session):
self.db = db
self.base_service = BaseDataService(db)
self.stock_service = StockService(db)
self.future_service = FutureService(db)
self.finance_service = FinanceService(db)
def get_categories(self) -> List[Dict]:
"""获取测试分类"""
categories = set(endpoint["category"] for endpoint in self.TEST_ENDPOINTS)
category_names = {
"base_data": "基础数据",
"stock": "股票数据",
"future": "期货数据",
"realtime": "实时数据",
"finance": "财务数据",
"shareholder": "股东数据",
"margin": "融资融券",
"index": "指数数据",
"etf": "ETF数据",
"kzz": "可转债数据",
"cache": "缓存管理"
}
return [
{"key": cat, "name": category_names.get(cat, cat)}
for cat in sorted(categories)
]
def get_endpoints(self, category: str = None) -> List[Dict]:
"""获取测试端点列表"""
if category:
return [ep for ep in self.TEST_ENDPOINTS if ep["category"] == category]
return self.TEST_ENDPOINTS
def run_test(self, endpoint: str, method: str, params: dict) -> Dict:
"""
运行单个测试
Args:
endpoint: 端点路径
method: HTTP方法
params: 请求参数
Returns:
测试结果
"""
start_time = time.time()
try:
# 根据端点调用相应的服务方法
result = self._call_endpoint(endpoint, method, params)
execution_time = int((time.time() - start_time) * 1000)
return {
"success": True,
"endpoint": endpoint,
"method": method,
"status_code": 200,
"execution_time_ms": execution_time,
"response_data": result,
"error_message": None
}
except Exception as e:
execution_time = int((time.time() - start_time) * 1000)
logger.error(f"测试失败 {endpoint}: {str(e)}")
return {
"success": False,
"endpoint": endpoint,
"method": method,
"status_code": 500,
"execution_time_ms": execution_time,
"response_data": None,
"error_message": str(e)
}
def _call_endpoint(self, endpoint: str, method: str, params: dict):
"""调用端点对应的服务方法"""
from app.utils.date_utils import parse_date
# 股票数据接口
if "/stock/kline" in endpoint and "/chart" in endpoint:
code = endpoint.split("/")[-2]
start_date = parse_date(params.get("start_date", "20240101"))
end_date = parse_date(params.get("end_date", "20241231"))
period = params.get("period", "daily")
return self.stock_service.get_kline_chart_data(code, start_date, end_date, period)
elif "/stock/kline" in endpoint:
codes = params.get("codes", "").split(",")
start_date = parse_date(params.get("start_date", "20240101"))
end_date = parse_date(params.get("end_date", "20241231"))
period = params.get("period", "daily")
return self.stock_service.get_kline(codes, start_date, end_date, period)
# 期货数据接口
elif "/future/kline" in endpoint and "/chart" in endpoint:
code = endpoint.split("/")[-2]
start_date = parse_date(params.get("start_date", "20240101"))
end_date = parse_date(params.get("end_date", "20241231"))
period = params.get("period", "daily")
return self.future_service.get_kline_chart_data(code, start_date, end_date, period)
elif "/future/kline" in endpoint:
codes = params.get("codes", "").split(",")
start_date = parse_date(params.get("start_date", "20240101"))
end_date = parse_date(params.get("end_date", "20241231"))
period = params.get("period", "daily")
return self.future_service.get_kline(codes, start_date, end_date, period)
# 财务数据接口
elif "/finance/balance-sheet" in endpoint:
codes = params.get("codes", "").split(",")
start_date = parse_date(params.get("start_date", "20240930"))
end_date = parse_date(params.get("end_date", "20240930"))
return self.finance_service.get_balance_sheet(codes, start_date, end_date)
elif "/finance/cash-flow" in endpoint:
codes = params.get("codes", "").split(",")
start_date = parse_date(params.get("start_date", "20240930"))
end_date = parse_date(params.get("end_date", "20240930"))
return self.finance_service.get_cash_flow(codes, start_date, end_date)
elif "/finance/income" in endpoint:
codes = params.get("codes", "").split(",")
start_date = parse_date(params.get("start_date", "20240930"))
end_date = parse_date(params.get("end_date", "20240930"))
return self.finance_service.get_income_statement(codes, start_date, end_date)
# 基础数据接口
elif "/base/codes" in endpoint:
security_type = params.get("security_type", "EXTRA_STOCK_A")
return {"codes": self.base_service.get_code_list(security_type)[:10]}
elif "/base/calendar" in endpoint:
market = params.get("market", "SH")
start_date = parse_date(params.get("start_date", "20240101"))
end_date = parse_date(params.get("end_date", "20241231"))
calendar = self.base_service.get_trading_calendar(market, start_date, end_date)
return {"calendar": [d.isoformat() for d in calendar[:10]]}
# 其他接口
else:
return {"message": "测试通过"}
def run_all_tests(self, categories: List[str] = None) -> Dict:
"""
运行全部测试
Args:
categories: 测试分类列表None表示全部
Returns:
测试结果汇总
"""
endpoints = self.get_endpoints()
if categories:
endpoints = [ep for ep in endpoints if ep["category"] in categories]
results = []
passed = 0
failed = 0
for endpoint in endpoints:
result = self.run_test(
endpoint["endpoint"],
endpoint["method"],
endpoint.get("params", {})
)
results.append(result)
if result["success"]:
passed += 1
else:
failed += 1
return {
"total": len(results),
"passed": passed,
"failed": failed,
"results": results
}
def log_test(self, test_name: str, api_category: str, api_endpoint: str,
request_method: str, request_params: dict, response_data: dict,
status_code: int, execution_time_ms: int, is_success: bool,
error_message: str = None):
"""记录测试日志"""
log = APITestLog(
test_name=test_name,
api_category=api_category,
api_endpoint=api_endpoint,
request_method=request_method,
request_params=request_params,
response_data=response_data,
status_code=status_code,
execution_time_ms=execution_time_ms,
is_success=is_success,
error_message=error_message
)
self.db.add(log)
self.db.commit()
def get_test_history(self, page: int = 1, page_size: int = 20) -> Dict:
"""获取测试历史"""
query = self.db.query(APITestLog).order_by(APITestLog.created_at.desc())
total = query.count()
logs = query.offset((page - 1) * page_size).limit(page_size).all()
return {
"items": logs,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}