""" K 线数据 API v2 - 缓存优先策略 金融数据中台核心接口 """ from datetime import datetime from typing import Annotated, List from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from app.schemas import KlineRequest, KlineResponse, KlineDataItem, ResponseData from app.services.kline_service import KlineService from app.db.init_db import get_sqlite_db router = APIRouter() @router.get("/data", response_model=KlineResponse) async def get_kline_data_v2( symbol: Annotated[str, Query(description="品种代码,如 IF2406, 600126.SH")], period: Annotated[str, Query(description="周期:1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w")], start: Annotated[datetime, Query(description="开始时间")], end: Annotated[datetime, Query(description="结束时间")], page: Annotated[int, Query(description="页码,默认 1")] = 1, page_size: Annotated[int, Query(description="每页数量,默认 1000,最大 5000")] = 1000, use_cache: Annotated[bool, Query(description="是否使用缓存,默认 true")] = True, ): """ 获取 K 线数据(缓存优先策略)v2 **核心逻辑**: 1. 先查询 Redis 缓存 2. 缓存命中直接返回 3. 缓存未命中则调用 amazingData 获取数据 4. 写入缓存并返回 - **symbol**: 品种代码 (如 IF2406, SH0001, 600126.SH) - **period**: 周期 (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w) - **start**: 开始时间 (ISO 8601 格式) - **end**: 结束时间 (ISO 8601 格式) - **use_cache**: 是否使用缓存 (默认 true) """ # 验证时间范围 if start >= end: raise HTTPException( status_code=400, detail="开始时间必须早于结束时间" ) # 验证开始时间不能晚于当前时间(允许 1 分钟误差) from datetime import timedelta if start > datetime.utcnow() + timedelta(minutes=1): raise HTTPException( status_code=400, detail="开始时间不能晚于当前时间" ) # 限制 page_size 最大值 if page_size > 5000: page_size = 5000 if page_size < 1: page_size = 1 if page < 1: page = 1 try: # v2 缓存优先策略 data = await KlineService.get_kline_data_v2( symbol=symbol, period=period, start_date=start, end_date=end, page=page, page_size=page_size, use_cache=use_cache ) return KlineResponse( code=0, message="success", data=[KlineDataItem(**item) for item in data], symbol=symbol, period=period ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to fetch kline data: {str(e)}" ) @router.get("/latest", response_model=ResponseData) async def get_latest_kline_v2( symbol: Annotated[str, Query(description="品种代码")], period: Annotated[str, Query(description="周期")], use_cache: Annotated[bool, Query(description="是否使用缓存")] = True, ): """获取最新一条 K 线数据(缓存优先)""" data = await KlineService.get_latest_kline_v2(symbol, period, use_cache) if not data: return ResponseData( code=404, message="No data found", data=None ) return ResponseData( code=0, message="success", data=data ) @router.get("/symbols", response_model=ResponseData) async def get_symbols_v2(): """获取所有品种代码列表""" symbols = KlineService.get_symbols() return ResponseData( code=0, message="success", data={"symbols": symbols, "count": len(symbols)} ) @router.get("/periods", response_model=ResponseData) async def get_periods_v2(): """获取所有支持的周期""" periods = KlineService.get_periods() return ResponseData( code=0, message="success", data={"periods": periods, "count": len(periods)} ) @router.get("/cache/stats", response_model=ResponseData) async def get_cache_stats(): """获取缓存统计信息""" try: from app.services.cache_service import cache_service if not cache_service._connected: return ResponseData( code=200, message="Redis not connected", data={"connected": False} ) stats = await cache_service.get_stats() return ResponseData( code=0, message="success", data={ "connected": True, **stats } ) except Exception as e: return ResponseData( code=500, message=f"Failed to get cache stats: {str(e)}", data=None ) @router.delete("/cache/clear", response_model=ResponseData) async def clear_cache( symbol: Annotated[str, Query(description="品种代码,不传则清空全部")] = "", period: Annotated[str, Query(description="周期,不传则清空该品种所有周期")] = "", ): """清除缓存""" try: from app.services.cache_service import cache_service if not symbol: # 清空全部缓存 count = await cache_service.clear_kline_cache() return ResponseData( code=0, message="All cache cleared", data={"cleared_count": count} ) else: # 清空指定品种所有周期 count = await cache_service.clear_kline_cache(symbol) return ResponseData( code=0, message=f"Cache cleared for symbol: {symbol}", data={"symbol": symbol, "cleared_count": count} ) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to clear cache: {str(e)}" )