You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.0 KiB

"""
K 线数据 API v2 - 缓存优先策略
金融数据中台核心接口
"""
from datetime import datetime
from typing import Annotated, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.schemas import KlineRequest, KlineResponse, KlineDataItem, ResponseData
from app.services.kline_service import KlineService
from app.db.init_db import get_sqlite_db
router = APIRouter()
@router.get("/data", response_model=KlineResponse)
async def get_kline_data_v2(
symbol: Annotated[str, Query(description="品种代码,如 IF2406, 600126.SH")],
period: Annotated[str, Query(description="周期1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w")],
start: Annotated[datetime, Query(description="开始时间")],
end: Annotated[datetime, Query(description="结束时间")],
page: Annotated[int, Query(description="页码,默认 1")] = 1,
page_size: Annotated[int, Query(description="每页数量,默认 1000最大 5000")] = 1000,
use_cache: Annotated[bool, Query(description="是否使用缓存,默认 true")] = True,
):
"""
获取 K 线数据缓存优先策略v2
**核心逻辑**
1. 先查询 Redis 缓存
2. 缓存命中直接返回
3. 缓存未命中则调用 amazingData 获取数据
4. 写入缓存并返回
- **symbol**: 品种代码 ( IF2406, SH0001, 600126.SH)
- **period**: 周期 (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w)
- **start**: 开始时间 (ISO 8601 格式)
- **end**: 结束时间 (ISO 8601 格式)
- **use_cache**: 是否使用缓存 (默认 true)
"""
# 验证时间范围
if start >= end:
raise HTTPException(
status_code=400,
detail="开始时间必须早于结束时间"
)
# 验证开始时间不能晚于当前时间(允许 1 分钟误差)
from datetime import timedelta
if start > datetime.utcnow() + timedelta(minutes=1):
raise HTTPException(
status_code=400,
detail="开始时间不能晚于当前时间"
)
# 限制 page_size 最大值
if page_size > 5000:
page_size = 5000
if page_size < 1:
page_size = 1
if page < 1:
page = 1
try:
# v2 缓存优先策略
data = await KlineService.get_kline_data_v2(
symbol=symbol,
period=period,
start_date=start,
end_date=end,
page=page,
page_size=page_size,
use_cache=use_cache
)
return KlineResponse(
code=0,
message="success",
data=[KlineDataItem(**item) for item in data],
symbol=symbol,
period=period
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to fetch kline data: {str(e)}"
)
@router.get("/latest", response_model=ResponseData)
async def get_latest_kline_v2(
symbol: Annotated[str, Query(description="品种代码")],
period: Annotated[str, Query(description="周期")],
use_cache: Annotated[bool, Query(description="是否使用缓存")] = True,
):
"""获取最新一条 K 线数据(缓存优先)"""
data = await KlineService.get_latest_kline_v2(symbol, period, use_cache)
if not data:
return ResponseData(
code=404,
message="No data found",
data=None
)
return ResponseData(
code=0,
message="success",
data=data
)
@router.get("/symbols", response_model=ResponseData)
async def get_symbols_v2():
"""获取所有品种代码列表"""
symbols = KlineService.get_symbols()
return ResponseData(
code=0,
message="success",
data={"symbols": symbols, "count": len(symbols)}
)
@router.get("/periods", response_model=ResponseData)
async def get_periods_v2():
"""获取所有支持的周期"""
periods = KlineService.get_periods()
return ResponseData(
code=0,
message="success",
data={"periods": periods, "count": len(periods)}
)
@router.get("/cache/stats", response_model=ResponseData)
async def get_cache_stats():
"""获取缓存统计信息"""
try:
from app.services.cache_service import cache_service
if not cache_service._connected:
return ResponseData(
code=200,
message="Redis not connected",
data={"connected": False}
)
stats = await cache_service.get_stats()
return ResponseData(
code=0,
message="success",
data={
"connected": True,
**stats
}
)
except Exception as e:
return ResponseData(
code=500,
message=f"Failed to get cache stats: {str(e)}",
data=None
)
@router.delete("/cache/clear", response_model=ResponseData)
async def clear_cache(
symbol: Annotated[str, Query(description="品种代码,不传则清空全部")] = "",
period: Annotated[str, Query(description="周期,不传则清空该品种所有周期")] = "",
):
"""清除缓存"""
try:
from app.services.cache_service import cache_service
if not symbol:
# 清空全部缓存
count = await cache_service.clear_kline_cache()
return ResponseData(
code=0,
message="All cache cleared",
data={"cleared_count": count}
)
else:
# 清空指定品种所有周期
count = await cache_service.clear_kline_cache(symbol)
return ResponseData(
code=0,
message=f"Cache cleared for symbol: {symbol}",
data={"symbol": symbol, "cleared_count": count}
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to clear cache: {str(e)}"
)