You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

283 lines
8.2 KiB

"""
amazingData 数据源 API 路由
提供真实期货/股票数据接入接口
"""
import logging
from datetime import datetime, date, timedelta
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.db.init_db import get_sqlite_db, get_timescale_db
from app.services.amazing_data_service import amazing_data_service, get_amazing_data_service
from app.services.data_sync_service import DataSyncService
from app.schemas import ResponseData
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/amazing-data", tags=["amazingData 数据源"])
@router.get("/connect", response_model=ResponseData)
def connect_data_source():
"""
连接 amazingData 数据源
建立与银河证券星耀数智量化平台的连接
"""
try:
success = amazing_data_service.connect()
if success:
return ResponseData(
code=0,
message="连接成功",
data={"connected": True}
)
else:
return ResponseData(
code=-1,
message="连接失败",
data={"connected": False}
)
except Exception as e:
logger.error(f"Connect failed: {e}")
return ResponseData(
code=-1,
message=f"连接失败:{str(e)}",
data={"connected": False}
)
@router.get("/disconnect", response_model=ResponseData)
def disconnect_data_source():
"""
断开 amazingData 连接
"""
try:
amazing_data_service.disconnect()
return ResponseData(
code=0,
message="断开连接成功",
data={"connected": False}
)
except Exception as e:
logger.error(f"Disconnect failed: {e}")
return ResponseData(
code=-1,
message=f"断开连接失败:{str(e)}"
)
@router.get("/status", response_model=ResponseData)
def get_connection_status():
"""
获取连接状态
"""
return ResponseData(
code=0,
message="success",
data={
"connected": amazing_data_service._connected,
"host": amazing_data_service._config.host if amazing_data_service._config else None,
"port": amazing_data_service._config.port if amazing_data_service._config else None
}
)
@router.get("/kline", response_model=ResponseData)
def get_kline_data(
symbol: str = Query(..., description="证券代码,如 IF2406"),
period: str = Query(..., description="周期1m, 5m, 15m, 30m, 1h, 1d"),
start_date: str = Query(None, description="开始日期 YYYY-MM-DD"),
end_date: str = Query(None, description="结束日期 YYYY-MM-DD"),
security_type: str = Query("EXTRA_FUTURE", description="证券类型")
):
"""
获取 K 线数据
amazingData 获取真实的期货/股票 K 线数据
"""
try:
# 默认日期范围:最近 7 天
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
data = amazing_data_service.get_kline_data(
symbol=symbol,
period=period,
start_date=start_date,
end_date=end_date,
security_type=security_type
)
return ResponseData(
code=0,
message="success",
data={
"symbol": symbol,
"period": period,
"count": len(data),
"records": data
}
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Get kline data failed: {e}")
raise HTTPException(status_code=500, detail=f"获取 K 线数据失败:{str(e)}")
@router.get("/quotes", response_model=ResponseData)
def get_realtime_quotes(
symbols: str = Query(..., description="证券代码列表,逗号分隔,如 IF2406,IC2406")
):
"""
获取实时行情
amazingData 获取真实的期货/股票实时行情
"""
try:
symbol_list = [s.strip() for s in symbols.split(",")]
data = amazing_data_service.get_realtime_quotes(symbol_list)
return ResponseData(
code=0,
message="success",
data={
"count": len(data),
"quotes": data
}
)
except Exception as e:
logger.error(f"Get realtime quotes failed: {e}")
raise HTTPException(status_code=500, detail=f"获取实时行情失败:{str(e)}")
@router.get("/codes", response_model=ResponseData)
def get_security_codes(
security_type: str = Query("EXTRA_FUTURE", description="证券类型"),
market: Optional[str] = Query(None, description="市场SH, SZ, BJ")
):
"""
获取证券代码列表
amazingData 获取可交易的证券代码列表
"""
try:
data = amazing_data_service.get_security_codes(
security_type=security_type,
market=market
)
return ResponseData(
code=0,
message="success",
data={
"count": len(data),
"codes": data
}
)
except Exception as e:
logger.error(f"Get security codes failed: {e}")
raise HTTPException(status_code=500, detail=f"获取证券代码失败:{str(e)}")
@router.get("/tick", response_model=ResponseData)
def get_tick_data(
symbol: str = Query(..., description="证券代码"),
date: str = Query(None, description="日期 YYYY-MM-DD默认今天"),
security_type: str = Query("EXTRA_FUTURE", description="证券类型")
):
"""
获取 Tick 数据
amazingData 获取详细的 Tick 级交易数据
"""
try:
if not date:
date = datetime.now().strftime("%Y-%m-%d")
data = amazing_data_service.get_tick_data(
symbol=symbol,
date=date,
security_type=security_type
)
return ResponseData(
code=0,
message="success",
data={
"symbol": symbol,
"date": date,
"count": len(data),
"records": data
}
)
except Exception as e:
logger.error(f"Get tick data failed: {e}")
raise HTTPException(status_code=500, detail=f"获取 Tick 数据失败:{str(e)}")
@router.post("/sync/kline", response_model=ResponseData)
def sync_kline_data(
symbol: str = Query(..., description="证券代码"),
period: str = Query(..., description="周期"),
start_date: str = Query(None, description="开始日期"),
end_date: str = Query(None, description="结束日期")
):
"""
手动同步 K 线数据
amazingData K 线数据同步到 TimescaleDB
"""
try:
count = DataSyncService.sync_kline_data(
symbol=symbol,
period=period,
start_date=start_date,
end_date=end_date
)
return ResponseData(
code=0,
message="同步成功",
data={
"symbol": symbol,
"period": period,
"records_synced": count
}
)
except Exception as e:
logger.error(f"Sync kline data failed: {e}")
raise HTTPException(status_code=500, detail=f"同步数据失败:{str(e)}")
@router.post("/sync/all", response_model=ResponseData)
async def sync_all_data():
"""
同步所有默认品种数据
批量同步所有默认品种和周期的数据到 TimescaleDB
"""
try:
result = await DataSyncService.sync_all_symbols()
return ResponseData(
code=0,
message="同步完成",
data=result
)
except Exception as e:
logger.error(f"Sync all data failed: {e}")
raise HTTPException(status_code=500, detail=f"同步数据失败:{str(e)}")