You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

476 lines
16 KiB

"""管理后台API路由 - 对应Go的api/admin_router.go"""
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Header, Query, Body
from typing import Optional, List
from app.models import (
Response, ConfigListRequest, ConfigUpdateRequest,
ReloadRequest, AdapterToggleRequest, AdapterConfigUpdateRequest,
APITestRequest, WSTestRequest, TestHistoryRequest,
DataSyncRequest, DataSyncType, DataSyncData, DataSyncResult
)
from app.services import ConfigService, AdapterService, TestService
from app.core.config import get_config
admin_router = APIRouter()
# 服务实例
config_service = ConfigService()
adapter_service = AdapterService()
test_service = TestService()
def verify_admin_token(x_admin_token: Optional[str] = Header(None)):
"""验证Admin Token"""
# TODO: 实现Token验证
return x_admin_token
# ============================================
# 系统管理接口
# ============================================
@admin_router.get("/admin/system/status", response_model=Response)
def get_system_status(
token: str = Depends(verify_admin_token)
):
"""获取系统状态"""
try:
data = config_service.get_system_status()
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/system/reload", response_model=Response)
def reload_config(
req: Optional[ReloadRequest] = None,
token: str = Depends(verify_admin_token)
):
"""热加载配置"""
try:
if req is None:
req = ReloadRequest()
data = config_service.reload_config(req)
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/system/restart", response_model=Response)
def restart_service(
token: str = Depends(verify_admin_token)
):
"""重启服务
注意: 此方法通过创建子进程实现服务重启适用于开发环境
生产环境建议使用Docker或systemd管理服务生命周期
"""
import os
import sys
import subprocess
import threading
import time
def delayed_restart():
"""延迟重启函数"""
time.sleep(2) # 等待当前响应返回
# 获取当前Python解释器和启动参数
python = sys.executable
args = sys.argv[:]
# 在Windows上使用start命令在Linux上使用nohup
if os.name == 'nt': # Windows
subprocess.Popen(
['start', 'python'] + args,
shell=True,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
else: # Linux/Mac
subprocess.Popen(
[python] + args,
stdout=open('/dev/null', 'w'),
stderr=open('/dev/null', 'w'),
start_new_session=True
)
# 退出当前进程
os._exit(0)
# 在后台线程中执行重启
restart_thread = threading.Thread(target=delayed_restart, daemon=True)
restart_thread.start()
return Response(
code=0,
message="服务将在2秒后重启",
data={"status": "restarting", "delay_seconds": 2}
)
# ============================================
# 配置管理接口
# ============================================
@admin_router.get("/admin/config", response_model=Response)
def get_config_list(
type: Optional[str] = Query(None, description="配置类型筛选"),
token: str = Depends(verify_admin_token)
):
"""获取配置列表"""
try:
from app.models import ConfigType
req = ConfigListRequest()
if type:
req.type = ConfigType(type)
data = config_service.get_config_list(req)
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.put("/admin/config", response_model=Response)
def update_config(
req: ConfigUpdateRequest,
token: str = Depends(verify_admin_token)
):
"""更新配置"""
try:
data = config_service.update_config(req)
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/config/reload", response_model=Response)
def reload_config_endpoint(
req: Optional[ReloadRequest] = None,
token: str = Depends(verify_admin_token)
):
"""热加载配置"""
return reload_config(req, token)
# ============================================
# 适配器管理接口
# ============================================
@admin_router.get("/admin/adapters", response_model=Response)
def get_adapter_list(
token: str = Depends(verify_admin_token)
):
"""获取适配器列表"""
try:
data = adapter_service.get_adapter_list()
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/adapters/toggle", response_model=Response)
def toggle_adapter(
req: AdapterToggleRequest,
token: str = Depends(verify_admin_token)
):
"""切换适配器状态"""
try:
adapter_service.toggle_adapter(req)
return Response(code=0, message="success")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.put("/admin/adapters/config", response_model=Response)
def update_adapter_config(
req: AdapterConfigUpdateRequest,
token: str = Depends(verify_admin_token)
):
"""更新适配器配置"""
try:
adapter_service.update_adapter_config(req)
return Response(code=0, message="success")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================
# 测试管理接口
# ============================================
@admin_router.get("/admin/tests/api", response_model=Response)
def get_api_test_list(
token: str = Depends(verify_admin_token)
):
"""获取API测试列表"""
try:
data = test_service.get_api_test_list()
# 设置基础URL
config = get_config()
data.base_url = f"http://localhost:{config.server.port}"
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/tests/api/run", response_model=Response)
async def run_api_test(
req: APITestRequest,
token: str = Depends(verify_admin_token)
):
"""执行API测试"""
try:
config = get_config()
base_url = f"http://localhost:{config.server.port}"
data = await test_service.run_api_test(base_url, req)
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.get("/admin/tests/ws", response_model=Response)
def get_ws_test_list(
token: str = Depends(verify_admin_token)
):
"""获取WebSocket测试列表"""
try:
data = test_service.get_ws_test_list()
config = get_config()
data.ws_url = f"ws://localhost:{config.server.port}/v1/stream"
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/tests/ws/run", response_model=Response)
async def run_ws_test(
req: WSTestRequest,
token: str = Depends(verify_admin_token)
):
"""执行WebSocket测试"""
try:
config = get_config()
ws_url = f"ws://localhost:{config.server.port}/v1/stream"
data = await test_service.run_ws_test(ws_url, req)
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.get("/admin/tests/history", response_model=Response)
def get_test_history(
type: Optional[str] = Query(None, description="测试类型"),
limit: int = Query(default=20, ge=1, le=100, description="数量限制"),
token: str = Depends(verify_admin_token)
):
"""获取测试历史"""
try:
req = TestHistoryRequest(type=type, limit=limit)
data = test_service.get_test_history(req)
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.get("/admin/tests/internal", response_model=Response)
def get_internal_test_list(
token: str = Depends(verify_admin_token)
):
"""获取内部接口测试列表SDK封装层"""
try:
data = test_service.get_internal_test_list()
return Response(code=0, message="success", data=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/tests/internal/run", response_model=Response)
async def run_internal_test(
req: APITestRequest,
token: str = Depends(verify_admin_token)
):
"""执行内部接口测试SDK封装层"""
try:
# 获取股票adapter
adapter = adapter_service.get_active_adapter("stock")
if not adapter:
# 尝试连接adapter
config = get_config()
active_name = config.sources.stock.active
await adapter_service._connect_adapter(active_name)
adapter = adapter_service.get_active_adapter("stock")
if not adapter:
raise HTTPException(status_code=500, detail="Stock adapter not available")
# 确保adapter已连接
if not hasattr(adapter, '_is_logged_in') or not adapter._is_logged_in:
if hasattr(adapter, 'config') and adapter.config:
# 使用已保存的配置重新连接
config_dict = {
'username': adapter.config.username,
'password': adapter.config.password,
'host': adapter.config.host,
'port': adapter.config.port,
'local_path': adapter.config.local_path,
'use_local_cache': adapter.config.use_local_cache
}
await adapter.connect(config_dict)
else:
raise HTTPException(status_code=500, detail="Adapter not configured")
data = await test_service.run_internal_test(adapter, req)
return Response(code=0, message="success", data=data)
except Exception as e:
import traceback
error(f"Internal test error: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================
# 数据同步接口
# ============================================
@admin_router.post("/admin/data/sync", response_model=Response)
async def sync_data(
req: DataSyncRequest,
token: str = Depends(verify_admin_token)
):
"""手动触发数据同步
支持同步类型:
- base: 基础K线数据 (OHLCV)
- quote: 行情指标数据 (均线MACD涨跌停等)
- finance: 财务数据 (市值股本利润等)
- full: 全量同步 (以上全部)
示例请求:
{
"symbols": ["600519.SH", "000001.SZ"],
"sync_type": "full",
"start_date": "20240101",
"end_date": "20240301",
"asset_class": "stock"
}
"""
import time
from app.services.adapter_service import AdapterService
from app.services.data_sync_service import DataSyncService
from app.models import Frequency
start_time = time.time()
try:
# 获取适配器
adapter_service = AdapterService()
adapter = adapter_service.get_active_adapter(req.asset_class)
if not adapter:
return Response(
code=400,
message=f"No active adapter found for asset class: {req.asset_class}",
data=None
)
# 创建同步服务
sync_service = DataSyncService(adapter)
# 设置默认日期
end_date = req.end_date or datetime.now().strftime("%Y%m%d")
start_date = req.start_date or (datetime.now() - timedelta(days=365)).strftime("%Y%m%d")
# 根据同步类型执行同步
base_results = []
quote_results = []
finance_results = []
if req.sync_type == DataSyncType.BASE or req.sync_type == DataSyncType.FULL:
freq = Frequency.FREQ_1D if req.freq == "1d" else Frequency.FREQ_1D
base_counts = await sync_service.sync_kline_base(
req.symbols, freq, start_date, end_date
)
base_results = [
DataSyncResult(symbol=k, count=v if v > 0 else 0, error=str(e) if v < 0 else None)
for k, v in base_counts.items()
]
if req.sync_type == DataSyncType.QUOTE or req.sync_type == DataSyncType.FULL:
quote_counts = await sync_service.sync_kline_quote(
req.symbols, start_date, end_date
)
quote_results = [
DataSyncResult(symbol=k, count=v if v > 0 else 0, error=str(e) if v < 0 else None)
for k, v in quote_counts.items()
]
if req.sync_type == DataSyncType.FINANCE or req.sync_type == DataSyncType.FULL:
finance_counts = await sync_service.sync_kline_finance(
req.symbols, start_date, end_date
)
finance_results = [
DataSyncResult(symbol=k, count=v if v > 0 else 0, error=str(e) if v < 0 else None)
for k, v in finance_counts.items()
]
total_time = int((time.time() - start_time) * 1000)
# 统计成功/失败数量
all_results = base_results + quote_results + finance_results
success_count = sum(1 for r in all_results if r.count > 0)
fail_count = sum(1 for r in all_results if r.error)
data = DataSyncData(
success=fail_count == 0,
message=f"Synced {success_count} symbols, {fail_count} failed, took {total_time}ms",
sync_type=req.sync_type,
base_results=base_results,
quote_results=quote_results,
finance_results=finance_results,
total_time_ms=total_time
)
return Response(code=0, message="success", data=data)
except Exception as e:
error(f"[DataSync API] Failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@admin_router.post("/admin/data/sync/incremental", response_model=Response)
async def sync_incremental(
symbols: List[str] = Body(..., description="标的代码列表"),
asset_class: str = Body("stock", description="资产类别"),
token: str = Depends(verify_admin_token)
):
"""触发增量同步最近30天
用于每日定时任务同步最近的数据
"""
from app.services.adapter_service import AdapterService
from app.services.data_sync_service import DataSyncService
try:
# 获取适配器
adapter_service = AdapterService()
adapter = adapter_service.get_active_adapter(asset_class)
if not adapter:
return Response(
code=400,
message=f"No active adapter found for asset class: {asset_class}",
data=None
)
# 创建同步服务
sync_service = DataSyncService(adapter)
# 执行增量同步
results = await sync_service.sync_daily_incremental(symbols)
return Response(code=0, message="success", data=results)
except Exception as e:
error(f"[DataSync API] Incremental sync failed: {e}")
raise HTTPException(status_code=500, detail=str(e))