""" 期货 K 线服务 v2.2 支持多周期、持仓量、结算价等期货特有数据 """ import logging from datetime import datetime, timedelta from typing import List, Optional, Dict, Any from enum import Enum from sqlalchemy.orm import Session from app.models.kline import ( Frequency, FuturesKLineItem, FuturesKLineData, FuturesSymbolInfo, FuturesContractInfo, FuturesKLineQuery ) from app.repositories.kline.futures_repository import FuturesKLineRepository from app.services.cache_service import cache_service logger = logging.getLogger(__name__) # 支持的期货 K 线周期 FUTURES_FREQUENCIES = [ Frequency.FREQ_1M, Frequency.FREQ_5M, Frequency.FREQ_15M, Frequency.FREQ_30M, Frequency.FREQ_1H, Frequency.FREQ_1D, Frequency.FREQ_1W, Frequency.FREQ_1MONTH, ] class FuturesKLineService: """期货 K 线服务""" def __init__(self, db: Session): self.repository = FuturesKLineRepository(db) self.db = db async def query_klines( self, symbol: str, freq: Frequency, start: datetime, end: datetime, use_cache: bool = True ) -> FuturesKLineData: """ 查询期货 K 线数据 Args: symbol: 合约代码 (如 IF2406, AG2605.SHF) freq: K 线周期 start: 开始时间 end: 结束时间 use_cache: 是否使用缓存 Returns: FuturesKLineData: K 线数据响应 Raises: ValueError: 参数验证失败 """ # 参数验证 self._validate_params(symbol, freq, start, end) # 验证周期是否支持 if freq not in FUTURES_FREQUENCIES: raise ValueError(f"不支持的期货 K 线周期: {freq}") # 尝试从缓存获取 cache_key = None if use_cache: cache_key = f"futures_kline:{symbol}:{freq.value}:{start.strftime('%Y%m%d')}:{end.strftime('%Y%m%d')}" cached = await cache_service.get(cache_key) if cached: logger.info(f"缓存命中: {cache_key}") return FuturesKLineData(**cached) # 查询数据库 items = self.repository.get_klines(symbol, freq, start, end) # 如果没有数据,尝试从适配器获取 if not items: logger.info(f"数据库无 {symbol} 数据,尝试从数据源获取") items = await self._fetch_from_adapter(symbol, freq, start, end) # 保存到数据库 if items: self._save_klines_to_db(symbol, freq, items) # 获取品种信息 symbol_info = self.repository.get_symbol_info(symbol) result = FuturesKLineData( symbol=symbol, name=symbol_info.name if symbol_info else "", freq=freq, count=len(items), items=items ) # 写入缓存 if cache_key and items: await cache_service.set(cache_key, result.model_dump(), expire=300) # 5分钟缓存 return result async def query_klines_batch( self, symbols: List[str], freq: Frequency, start: datetime, end: datetime, use_cache: bool = True ) -> Dict[str, FuturesKLineData]: """ 批量查询期货 K 线数据 Args: symbols: 合约代码列表 (最多100个) freq: K 线周期 start: 开始时间 end: 结束时间 use_cache: 是否使用缓存 Returns: Dict[str, FuturesKLineData]: 合约代码 -> K线数据映射 """ if len(symbols) > 100: raise ValueError("批量查询最多支持 100 个合约") results = {} for symbol in symbols: try: results[symbol] = await self.query_klines( symbol, freq, start, end, use_cache ) except Exception as e: logger.error(f"查询 {symbol} 失败: {e}") results[symbol] = FuturesKLineData( symbol=symbol, name="", freq=freq, count=0, items=[], error=str(e) ) return results async def get_main_contract_klines( self, product_code: str, freq: Frequency, start: datetime, end: datetime ) -> Optional[FuturesKLineData]: """ 获取主力合约 K 线数据 Args: product_code: 品种代码 (如 IF, IC, AG) freq: K 线周期 start: 开始时间 end: 结束时间 Returns: Optional[FuturesKLineData]: 主力合约 K 线数据 """ # 获取主力合约 main_contract = self.repository.get_main_contract(product_code) if not main_contract: logger.warning(f"品种 {product_code} 无主力合约") return None return await self.query_klines(main_contract, freq, start, end) def get_active_contracts( self, product_code: str, limit: int = 10 ) -> List[FuturesContractInfo]: """ 获取活跃合约列表 Args: product_code: 品种代码 limit: 返回数量 Returns: List[FuturesContractInfo]: 活跃合约列表 """ return self.repository.get_active_contracts(product_code, limit) def get_contract_info(self, symbol: str) -> Optional[FuturesContractInfo]: """ 获取合约信息 Args: symbol: 合约代码 Returns: Optional[FuturesContractInfo]: 合约信息 """ return self.repository.get_contract_info(symbol) def _validate_params( self, symbol: str, freq: Frequency, start: datetime, end: datetime ) -> None: """验证查询参数""" if not symbol: raise ValueError("合约代码不能为空") if start >= end: raise ValueError("开始时间必须早于结束时间") # 时间范围限制 max_days = { Frequency.FREQ_1M: 30, Frequency.FREQ_5M: 60, Frequency.FREQ_15M: 90, Frequency.FREQ_30M: 180, Frequency.FREQ_1H: 365, Frequency.FREQ_1D: 3650, # 10年 Frequency.FREQ_1W: 3650, Frequency.FREQ_1MONTH: 3650, } days = (end - start).days max_allowed = max_days.get(freq, 365) if days > max_allowed: raise ValueError(f"{freq.value} 周期最多查询 {max_allowed} 天数据") async def _fetch_from_adapter( self, symbol: str, freq: Frequency, start: datetime, end: datetime ) -> List[FuturesKLineItem]: """ 从数据源适配器获取 K 线数据 注意: 此方法修复了原代码中的异步问题 原代码错误地使用 asyncio.new_event_loop() """ try: from app.services.amazing_data_service import AmazingDataService # 获取服务实例 service = AmazingDataService() # 转换频率格式 period_map = { Frequency.FREQ_1M: "min1", Frequency.FREQ_5M: "min5", Frequency.FREQ_15M: "min15", Frequency.FREQ_30M: "min30", Frequency.FREQ_1H: "min60", Frequency.FREQ_1D: "day", Frequency.FREQ_1W: "week", Frequency.FREQ_1MONTH: "month", } period = period_map.get(freq, "day") # 使用上下文管理器确保连接正确关闭 async with service.get_connection() as conn: # 调用适配器获取数据 df = await conn.get_kline( symbol=symbol, period=period, start_time=start.strftime("%Y%m%d%H%M%S"), end_time=end.strftime("%Y%m%d%H%M%S") ) if df is None or df.empty: logger.warning(f"适配器返回空数据: {symbol}") return [] # 转换为 KLineItem 列表 items = [] for _, row in df.iterrows(): items.append(FuturesKLineItem( symbol=symbol, time=row.get('time', row.name) if isinstance(row.get('time'), datetime) else datetime.strptime(str(row.get('time', row.name)), "%Y-%m-%d %H:%M:%S"), open=float(row.get('open', 0)), high=float(row.get('high', 0)), low=float(row.get('low', 0)), close=float(row.get('close', 0)), volume=int(row.get('volume', 0)), open_interest=int(row.get('open_interest', 0)) if 'open_interest' in row else 0, settlement_price=float(row.get('settlement', 0)) if 'settlement' in row else None, trade_date=row.get('trade_date', row.get('time', row.name).date() if hasattr(row.get('time', row.name), 'date') else None) )) logger.info(f"从适配器获取 {symbol} {freq.value} 数据 {len(items)} 条") return items except ImportError: logger.warning("AmazingDataService 未安装,无法从适配器获取数据") return [] except Exception as e: logger.error(f"从适配器获取数据失败: {e}") return [] def _save_klines_to_db( self, symbol: str, freq: Frequency, items: List[FuturesKLineItem] ) -> int: """ 保存 K 线数据到数据库 Args: symbol: 合约代码 freq: K 线周期 items: K 线数据列表 Returns: int: 保存的数量 """ try: count = self.repository.save_klines(symbol, freq, items) logger.info(f"保存 {symbol} {freq.value} 数据 {count} 条") return count except Exception as e: logger.error(f"保存数据失败: {e}") return 0 async def sync_klines( self, symbol: str, freq: Frequency, days: int = 30 ) -> Dict[str, Any]: """ 同步 K 线数据 Args: symbol: 合约代码 freq: K 线周期 days: 同步天数 Returns: Dict: 同步结果 """ end = datetime.now() start = end - timedelta(days=days) # 获取最新时间戳 latest = self.repository.get_latest_timestamp(symbol, freq) if latest and latest > start: start = latest + timedelta(seconds=1) if start >= end: return { "symbol": symbol, "freq": freq.value, "status": "already_synced", "count": 0 } # 从适配器获取数据 items = await self._fetch_from_adapter(symbol, freq, start, end) # 保存到数据库 count = self._save_klines_to_db(symbol, freq, items) return { "symbol": symbol, "freq": freq.value, "status": "synced", "count": count, "start": start.isoformat(), "end": end.isoformat() } # 服务工厂函数 def get_futures_kline_service(db: Session) -> FuturesKLineService: """获取期货 K 线服务实例""" return FuturesKLineService(db)