""" amazingData 数据服务 基于银河证券星耀数智量化平台 SDK 的数据接入服务 """ import logging from datetime import datetime, date, timedelta from typing import List, Dict, Optional, Any from contextlib import contextmanager import threading from app.config import settings from app.services.amazing_data_adapter import ( AmazingDataAdapter, DataSourceConfig, SecurityType, Period, Market ) logger = logging.getLogger(__name__) class AmazingDataService: """ amazingData 数据服务 提供登录/登出、K 线数据、实时行情等功能 使用连接池管理连接,避免连接数超限 """ _instance = None _lock = threading.Lock() def __new__(cls): """单例模式""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self._adapter: Optional[AmazingDataAdapter] = None self._connected = False self._connection_lock = threading.Lock() # 连接配置 self._config = DataSourceConfig( username=settings.AMAZING_DATA_ACCOUNT, password=settings.AMAZING_DATA_PASSWORD, host=settings.AMAZING_DATA_HOST, port=settings.AMAZING_DATA_PORT, local_path='/app/data/amazing_data_cache/', use_local_cache=True ) def connect(self) -> bool: """ 连接到 amazingData Returns: bool: 连接是否成功 """ with self._connection_lock: if self._connected and self._adapter: logger.info("Already connected to amazingData") return True try: logger.info(f"Connecting to amazingData at {settings.AMAZING_DATA_HOST}:{settings.AMAZING_DATA_PORT}") self._adapter = AmazingDataAdapter(self._config) if self._adapter.connect(): self._connected = True logger.info("Successfully connected to amazingData") return True else: logger.error("Failed to connect to amazingData") return False except Exception as e: logger.error(f"Error connecting to amazingData: {e}") self._connected = False return False def disconnect(self): """断开连接""" with self._connection_lock: if self._adapter: try: self._adapter.disconnect() logger.info("Disconnected from amazingData") except Exception as e: logger.error(f"Error disconnecting: {e}") finally: self._adapter = None self._connected = False @contextmanager def get_connection(self): """ 上下文管理器获取连接 确保使用后正确登出 Usage: with service.get_connection() as adapter: data = adapter.get_kline_data(...) """ connected = False try: if not self._connected: connected = self.connect() else: connected = True if not connected: raise Exception("无法连接到 amazingData") yield self._adapter except Exception as e: logger.error(f"Error in connection context: {e}") raise finally: # 注意:不在这里调用 logout,由调用者管理生命周期 # 避免频繁登录登出导致连接数超限 pass def ensure_connected(self) -> bool: """确保已连接""" if not self._connected: return self.connect() return True # ============== K 线数据 ============== def get_kline_data( self, symbol: str, period: str, start_date: str, end_date: str, security_type: str = "EXTRA_FUTURE" ) -> List[Dict]: """ 获取 K 线数据 Args: symbol: 证券代码 (如 IF2406) period: 周期 (min1, min5, min15, min30, min60, day) start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) security_type: 证券类型 Returns: List[Dict]: K 线数据列表 """ try: if not self.ensure_connected(): raise Exception("未连接到数据源") # 转换周期 period_map = { '1m': Period.MIN1, '5m': Period.MIN5, '15m': Period.MIN15, '30m': Period.MIN30, '60m': Period.MIN60, '1h': Period.MIN60, '1d': Period.DAILY, '1w': Period.WEEKLY, '1mo': Period.MONTHLY } period_enum = period_map.get(period, Period.DAILY) # 获取数据 # adapter 的 get_kline 返回 Dict[code, DataFrame] kline_dict = self._adapter.get_kline( codes=symbol, start_date=start_date, end_date=end_date, period=period_enum ) # 从字典中获取 DataFrame df = kline_dict.get(symbol, None) if isinstance(kline_dict, dict) else None if df is None or df.empty: logger.warning(f"No kline data for {symbol} {period}") return [] # 转换为字典列表 result = df.to_dict('records') logger.info(f"Retrieved {len(result)} kline records for {symbol}") return result except Exception as e: logger.error(f"Error getting kline data for {symbol}: {e}") raise def get_realtime_quotes(self, symbols: List[str]) -> List[Dict]: """ 获取实时行情 Args: symbols: 证券代码列表 Returns: List[Dict]: 实时行情数据 """ try: if not self.ensure_connected(): raise Exception("未连接到数据源") df = self._adapter.get_realtime_quotes(symbols) if df is None or df.empty: return [] result = df.to_dict('records') logger.info(f"Retrieved realtime quotes for {len(result)} symbols") return result except Exception as e: logger.error(f"Error getting realtime quotes: {e}") raise def get_security_codes( self, security_type: str = "EXTRA_FUTURE", market: Optional[str] = None ) -> List[Dict]: """ 获取证券代码列表 Args: security_type: 证券类型 (EXTRA_FUTURE, EXTRA_STOCK_A, etc.) market: 市场 (SH, SZ, BJ) Returns: List[Dict]: 证券代码列表 """ try: if not self.ensure_connected(): raise Exception("未连接到数据源") # 将字符串转换为 SecurityType 枚举 sec_type = SecurityType(security_type) # 使用 adapter 的 get_code_info 获取详细信息 df = self._adapter.get_code_info(security_type=sec_type) if df is None or df.empty: return [] result = df.to_dict('records') logger.info(f"Retrieved {len(result)} security codes") return result except Exception as e: logger.error(f"Error getting security codes: {e}") raise def get_tick_data( self, symbol: str, date: str, security_type: str = "EXTRA_FUTURE" ) -> List[Dict]: """ 获取 Tick 数据 Args: symbol: 证券代码 date: 日期 (YYYY-MM-DD) security_type: 证券类型 Returns: List[Dict]: Tick 数据 """ try: if not self.ensure_connected(): raise Exception("未连接到数据源") df = self._adapter.get_tick_data( code=symbol, date=date, security_type=security_type ) if df is None or df.empty: return [] result = df.to_dict('records') logger.info(f"Retrieved {len(result)} tick records for {symbol}") return result except Exception as e: logger.error(f"Error getting tick data for {symbol}: {e}") raise # 全局服务实例 amazing_data_service = AmazingDataService() def get_amazing_data_service() -> AmazingDataService: """获取 amazingData 服务实例""" return amazing_data_service