""" 数据接口 - 批量获取 / 获取最新缓存 """ import logging from typing import Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from sqlalchemy.orm import Session from app.database import get_db from app.schemas import ( BatchFetchRequest, BatchFetchResponse, LatestDataResponse, CandleItem, TimeframeData, SymbolDataResponse, ) from app.services.collector import fetch_symbol_data, fetch_batch from app.services.cache import ( save_market_data, get_cached_data, get_latest_cached, check_cache_status, ) from app.config import CACHE_TTL_SECONDS from datetime import datetime logger = logging.getLogger(__name__) router = APIRouter(prefix="/data", tags=["数据"]) @router.post("/batch-fetch", response_model=BatchFetchResponse) def batch_fetch(req: BatchFetchRequest, db: Session = Depends(get_db)): """ 批量获取指定品种、指定周期的数据。 智能缓存:已存在且有效的数据不重复请求。 """ symbols = req.symbols periods = req.periods data_type = req.data_type success = [] failed = [] details = {} for sym in symbols: status = check_cache_status(db, sym, data_type, periods) if status["all_valid"]: logger.info(f"[{sym}] 缓存全部命中,跳过采集") cached = get_cached_data(db, sym, data_type, periods) timeframes = [] for p, candles in cached["timeframes"].items(): # 转换数据格式: time -> datetime normalized_candles = [] for c in candles: candle_dict = dict(c) if 'time' in candle_dict and 'datetime' not in candle_dict: candle_dict['datetime'] = candle_dict.pop('time') normalized_candles.append(candle_dict) timeframes.append(TimeframeData( period=p, candles=[CandleItem(**c) for c in normalized_candles], candle_count=len(normalized_candles), fetched_at=cached.get("timestamp", ""), )) details[sym] = SymbolDataResponse( symbol=sym, data_type=data_type, current_price=cached.get("current_price"), timeframes=timeframes, source="cache", ) success.append(sym) continue need_fetch = status["missing_periods"] logger.info(f"[{sym}] 缓存部分缺失,需要采集: {need_fetch}") result = fetch_symbol_data(sym, data_type, need_fetch) if result.get("timeframes"): save_market_data(db, sym, result) success.append(sym) all_timeframes = {} if status["valid_periods"]: existing = get_cached_data(db, sym, data_type, status["valid_periods"]) if existing: all_timeframes.update(existing["timeframes"]) all_timeframes.update(result["timeframes"]) timeframes = [] for p in periods: candles = all_timeframes.get(p, []) if candles: # 转换数据格式: time -> datetime normalized_candles = [] for c in candles: candle_dict = dict(c) if 'time' in candle_dict and 'datetime' not in candle_dict: candle_dict['datetime'] = candle_dict.pop('time') normalized_candles.append(candle_dict) timeframes.append(TimeframeData( period=p, candles=[CandleItem(**c) for c in normalized_candles], candle_count=len(normalized_candles), fetched_at=result.get("timestamp", ""), )) details[sym] = SymbolDataResponse( symbol=sym, data_type=data_type, current_price=result.get("current_price"), timeframes=timeframes, source="live+cache", ) else: failed.append(sym) details[sym] = {"error": result.get("error", "未知错误")} return BatchFetchResponse( success=success, failed=failed, details=details, ) @router.get("/latest/{symbol}", response_model=SymbolDataResponse) def get_latest( symbol: str, data_type: str = "futures", period: Optional[str] = None, end_time: Optional[str] = None, db: Session = Depends(get_db), ): """ 从缓存获取最新数据。 可指定单个 period,不指定则返回所有已缓存周期。 可选指定 end_time 过滤K线数据(ISO格式),默认为当前时间。 end_time 可以是日期(YYYY-MM-DD)或日期时间(YYYY-MM-DDTHH:MM:SS)。 """ import traceback # 解析时间参数,默认为当前时间 end_dt = None if end_time: try: # 尝试解析ISO格式时间 end_dt = datetime.fromisoformat(end_time) logger.info(f"成功解析 end_time: {end_time} -> {end_dt}") except Exception as e: logger.error(f"end_time 解析失败: {end_time}, 错误: {e}") logger.error(f"错误堆栈: {traceback.format_exc()}") raise HTTPException(status_code=400, detail=f"end_time 格式错误: {str(e)}") try: cached = get_cached_data(db, symbol, data_type, [period] if period else None, end_time=end_dt) if not cached: raise HTTPException(status_code=404, detail=f"未找到 {symbol} 的缓存数据") timeframes = [] for p, candles in cached["timeframes"].items(): # 转换数据格式: time -> datetime normalized_candles = [] for c in candles: candle_dict = dict(c) if 'time' in candle_dict and 'datetime' not in candle_dict: candle_dict['datetime'] = candle_dict.pop('time') normalized_candles.append(candle_dict) timeframes.append(TimeframeData( period=p, candles=[CandleItem(**c) for c in normalized_candles], candle_count=len(normalized_candles), fetched_at=cached.get("timestamp", ""), )) return SymbolDataResponse( symbol=symbol, data_type=data_type, current_price=cached.get("current_price"), timeframes=timeframes, source="cache" if cached.get("is_fresh", False) else "cache_stale", ) except HTTPException: raise except Exception as e: logger.error(f"获取数据失败: symbol={symbol}, period={period}, end_time={end_time}") logger.error(f"错误: {e}") logger.error(f"错误堆栈: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}") @router.get("/latest/{symbol}/{period}") def get_latest_by_period( symbol: str, period: str, data_type: str = "futures", db: Session = Depends(get_db), ): """ 获取缓存中指定品种+周期的最新数据。 返回单个周期的 K 线。 """ cached = get_cached_data(db, symbol, data_type, [period]) if not cached: raise HTTPException(status_code=404, detail=f"未找到 {symbol} {period} 的缓存") candles = cached["timeframes"].get(period, []) return { "symbol": symbol, "period": period, "data_type": data_type, "candles": candles, "candle_count": len(candles), "current_price": cached.get("current_price"), "fetched_at": cached.get("timestamp"), "is_fresh": cached.get("is_fresh", False), } @router.get("/cache-status/{symbol}") def cache_status(symbol: str, db: Session = Depends(get_db)): """查看品种的缓存状态""" records = get_latest_cached(db, symbol) if not records: return {"symbol": symbol, "cached_periods": [], "status": "no_data"} now = datetime.now() periods_info = [] for r in records: age_seconds = (now - r.fetched_at).total_seconds() periods_info.append({ "period": r.period, "candle_count": r.candle_count, "fetched_at": r.fetched_at.isoformat(), "age_seconds": round(age_seconds, 0), "is_fresh": age_seconds < CACHE_TTL_SECONDS, }) return { "symbol": symbol, "cached_periods": periods_info, "status": "ok", } @router.get("/latest-timestamps") def get_latest_timestamps(db: Session = Depends(get_db)): """ 获取所有品种的最新数据时间戳。 """ from app.models import SymbolTimestamp timestamps = db.query(SymbolTimestamp).all() data = [] for ts in timestamps: data.append({ "symbol": ts.symbol, "data_type": ts.data_type, "last_refresh_at": ts.last_refresh_at.isoformat() if ts.last_refresh_at else None }) return { "success": True, "data": data }