# 数据获取模块 import os import time import pandas as pd from typing import Dict, Optional, List from qihuo_analyzer.utils.config_manager import config_manager from qihuo_analyzer.data.api_adapters import DataAdapterFactory class DataFetcher: """数据获取器""" def __init__(self): # 使用适配器工厂创建数据适配器 self.adapter = DataAdapterFactory.create_adapter() self.api_connected = False def connect(self) -> bool: """连接API""" try: # 使用适配器的connect方法 success = self.adapter.connect() self.api_connected = success return success except Exception as e: print(f"API连接失败:{e}") self.api_connected = False return False def disconnect(self): """断开连接""" if self.api_connected: try: # 使用适配器的disconnect方法 self.adapter.disconnect() self.api_connected = False except: pass def get_product_name_cn(self, symbol: str) -> str: """获取合约的中文名称 Args: symbol: 合约代码,如 'CU2603' Returns: 合约的中文名称,如 '铜' """ # 品种中文名称映射 product_name_map = { 'CU': '铜', 'AL': '铝', 'ZN': '锌', 'PB': '铅', 'NI': '镍', 'SN': '锡', 'AU': '黄金', 'AG': '白银', 'RB': '螺纹钢', 'HC': '热轧卷板', 'BU': '沥青', 'RU': '橡胶', 'FU': '燃油', 'SC': '原油', 'I': '铁矿石', 'J': '焦炭', 'JM': '焦煤', 'A': '大豆', 'B': '豆粕', 'M': '豆粕', 'Y': '豆油', 'P': '棕榈油', 'C': '玉米', 'CS': '玉米淀粉', 'L': '聚乙烯', 'V': '聚氯乙烯', 'PP': '聚丙烯', 'TA': 'PTA', 'CF': '棉花', 'SR': '白糖', 'MA': '甲醇', 'ZC': '动力煤', 'FG': '玻璃', 'RM': '菜籽粕', 'OI': '菜籽油', 'RS': '菜籽', 'WH': '强麦', 'JR': '粳稻', 'LR': '晚籼稻', } if len(symbol) >= 2: product_code = symbol[:2].upper() return product_name_map.get(product_code, product_code) else: return symbol def get_kline_data(self, symbol: str, duration: str, count: int = 200) -> Optional[pd.DataFrame]: """获取K线数据 Args: symbol: 合约代码 duration: 时间周期,如 '1m', '5m', '15m', '1h', '1d' count: 数据数量 Returns: K线数据DataFrame,如果无法获取真实数据则返回模拟数据 """ try: # 使用适配器的get_kline_data方法 result = self.adapter.get_kline_data(symbol, duration, count) if result is None: # 如果适配器返回None,使用模拟数据 print("适配器返回None,使用模拟K线数据") return self._get_mock_kline_data(symbol, duration, count) return result except Exception as e: print(f"获取K线数据失败:{e}") return self._get_mock_kline_data(symbol, duration, count) def get_tick_data(self, symbol: str, count: int = 1000) -> Optional[pd.DataFrame]: """获取Tick数据""" try: # 使用适配器的get_tick_data方法 result = self.adapter.get_tick_data(symbol, count) if result is None: # 如果适配器返回None,使用模拟数据 return self._get_mock_tick_data(symbol, count) return result except Exception as e: print(f"获取Tick数据失败:{e}") return self._get_mock_tick_data(symbol, count) def get_contract_info(self, symbol: str) -> Optional[Dict]: """获取合约信息""" try: # 使用适配器的get_contract_info方法 result = self.adapter.get_contract_info(symbol) if result is None: # 如果适配器返回None,使用模拟数据 return self._get_mock_contract_info(symbol) return result except Exception as e: print(f"获取合约信息失败:{e}") return self._get_mock_contract_info(symbol) def get_market_data(self, symbols: List[str]) -> Dict[str, Dict]: """批量获取市场数据""" try: # 使用适配器的get_market_data方法 result = self.adapter.get_market_data(symbols) if result: return result else: # 如果适配器返回空,使用模拟数据 market_data = {} for symbol in symbols: market_data[symbol] = self._get_mock_market_data(symbol) return market_data except Exception as e: print(f"获取市场数据失败:{e}") # 使用模拟数据 market_data = {} for symbol in symbols: market_data[symbol] = self._get_mock_market_data(symbol) return market_data def _get_mock_kline_data(self, symbol: str, duration: str, count: int) -> pd.DataFrame: """获取模拟K线数据""" # 生成时间序列 end_time = pd.Timestamp.now() if duration == '1m': freq = '1T' elif duration == '5m': freq = '5T' elif duration == '15m': freq = '15T' elif duration == '30m': freq = '30T' elif duration == '1h': freq = '1H' elif duration == '1d': freq = '1D' else: freq = '1H' datetime_index = pd.date_range(end=end_time, periods=count, freq=freq) # 生成随机价格数据 base_price = 3500 price_changes = np.random.normal(0, 5, count) prices = base_price + np.cumsum(price_changes) # 生成其他数据 opens = prices * (1 + np.random.normal(0, 0.001, count)) highs = np.maximum(prices, opens) * (1 + np.random.normal(0, 0.002, count)) lows = np.minimum(prices, opens) * (1 - np.random.normal(0, 0.002, count)) volumes = np.random.randint(1000, 10000, count) open_interests = np.random.randint(10000, 100000, count) # 创建DataFrame df = pd.DataFrame({ 'open': opens, 'high': highs, 'low': lows, 'close': prices, 'volume': volumes, 'open_interest': open_interests }, index=datetime_index) return df def _get_mock_tick_data(self, symbol: str, count: int) -> pd.DataFrame: """获取模拟Tick数据""" # 生成时间序列 end_time = pd.Timestamp.now() datetime_index = pd.date_range(end=end_time, periods=count, freq='1S') # 生成随机价格数据 base_price = 3500 price_changes = np.random.normal(0, 0.5, count) last_prices = base_price + np.cumsum(price_changes) # 生成其他数据 volumes = np.random.randint(10, 100, count) open_interests = np.random.randint(10000, 100000, count) bid_prices = last_prices * (1 - np.random.normal(0, 0.0005, count)) ask_prices = last_prices * (1 + np.random.normal(0, 0.0005, count)) bid_volumes = np.random.randint(10, 50, count) ask_volumes = np.random.randint(10, 50, count) # 创建DataFrame df = pd.DataFrame({ 'last_price': last_prices, 'volume': volumes, 'open_interest': open_interests, 'bid_price1': bid_prices, 'bid_volume1': bid_volumes, 'ask_price1': ask_prices, 'ask_volume1': ask_volumes }, index=datetime_index) return df def _get_mock_contract_info(self, symbol: str) -> Dict: """获取模拟合约信息""" return { 'symbol': symbol, 'name': symbol, 'exchange': 'SHFE', 'product': symbol[:2], 'price_tick': 1, 'volume_multiple': 10, 'margin_rate': 0.1, 'expire_datetime': int(time.time() * 1e9) + 90 * 24 * 3600 * 1e9, 'create_datetime': int(time.time() * 1e9) - 180 * 24 * 3600 * 1e9 } def _get_mock_market_data(self, symbol: str) -> Dict: """获取模拟市场数据""" base_price = 3500 return { 'latest_price': base_price + np.random.normal(0, 10), 'open': base_price, 'high': base_price + 20, 'low': base_price - 20, 'pre_close': base_price, 'volume': np.random.randint(10000, 100000), 'open_interest': np.random.randint(100000, 1000000), 'bid_price1': base_price - 1, 'ask_price1': base_price + 1 } def get_all_symbols(self) -> List[str]: """获取所有品种列表 Returns: List[str]: 所有品种的合约代码列表 """ try: # 使用适配器的get_all_symbols方法 result = self.adapter.get_all_symbols() if result: return result else: # 如果适配器返回空,使用本地枚举数据 print("使用本地枚举品种列表") symbols_by_exchange = self.get_all_symbols_by_exchange() symbols = [] for exchange, products in symbols_by_exchange.items(): for product, product_contracts in products.items(): # 使用每个品种的第一个合约作为代表 if product_contracts: symbols.append(product_contracts[0]) return symbols except Exception as e: print(f"获取所有品种列表失败:{e}") return self._get_mock_all_symbols() def _get_mock_all_symbols(self) -> List[str]: """获取模拟品种列表""" # 返回用户指定的所有期货品种 return [ "AU2603", "AG2603", "CU2603", "NI2603", "SN2603", "FG2603", "LY2603", "SA2603", "JM2603", "RB2603", "ALO2603", "MA2603", "V2603", "FU2603", "SC2603", "AL2603", "P2603", "LI2603", "SI2603", "RU2603", "BR2603", "ZN2603", "NR2603", "SP2603", "IM2603", "IC2603", "LU2603", "IH2603" ] def get_all_symbols_by_exchange(self) -> Dict[str, Dict[str, List[str]]]: """获取所有品种列表,按交易所-合约划分 Returns: Dict[str, Dict[str, List[str]]]: 按交易所-合约划分的品种列表 """ # 本地枚举数据,按交易所-合约划分 symbols_by_exchange = { "SHFE": { # 上海期货交易所 "AU": ["AU2603", "AU2604", "AU2605", "AU2606", "AU2607", "AU2608", "AU2609"], # 黄金 "AG": ["AG2603", "AG2604", "AG2605", "AG2606", "AG2607", "AG2608", "AG2609"], # 白银 "CU": ["CU2603", "CU2604", "CU2605", "CU2606", "CU2607", "CU2608", "CU2609"]#, # 铜 # "NI": ["NI2603", "NI2604", "NI2605", "NI2606", "NI2607", "NI2608", "NI2609"], # 镍 # "SN": ["SN2603", "SN2604", "SN2605", "SN2606", "SN2607", "SN2608", "SN2609"], # 锡 # "FG": ["FG2603", "FG2604", "FG2605", "FG2606", "FG2607", "FG2608", "FG2609"], # 玻璃 # "RB": ["RB2603", "RB2604", "RB2605", "RB2606", "RB2607", "RB2608", "RB2609"], # 螺纹钢 # "AL": ["AL2603", "AL2604", "AL2605", "AL2606", "AL2607", "AL2608", "AL2609"], # 铝 # "ZN": ["ZN2603", "ZN2604", "ZN2605", "ZN2606", "ZN2607", "ZN2608", "ZN2609"], # 锌 # "RU": ["RU2603", "RU2604", "RU2605", "RU2606", "RU2607", "RU2608", "RU2609"], # 橡胶 # "NR": ["NR2603", "NR2604", "NR2605", "NR2606", "NR2607", "NR2608", "NR2609"], # 20号胶 # "FU": ["FU2603", "FU2604", "FU2605", "FU2606", "FU2607", "FU2608", "FU2609"], # 燃油 # "SC": ["SC2603", "SC2604", "SC2605", "SC2606", "SC2607", "SC2608", "SC2609"], # 原油 # "LU": ["LU2603", "LU2604", "LU2605", "LU2606", "LU2607", "LU2608", "LU2609"], # 低硫燃油 # "ALO": ["ALO2603", "ALO2604", "ALO2605", "ALO2606", "ALO2607", "ALO2608", "ALO2609"], # 氧化铝 # "LI": ["LI2603", "LI2604", "LI2605", "LI2606", "LI2607", "LI2608", "LI2609"], # 碳酸锂 # "SI": ["SI2603", "SI2604", "SI2605", "SI2606", "SI2607", "SI2608", "SI2609"] # 工业硅 }#, # "INE": { # 上海国际能源交易中心 # "SC": ["SC2603", "SC2604", "SC2605", "SC2606", "SC2607", "SC2608", "SC2609"], # 原油 # "LU": ["LU2603", "LU2604", "LU2605", "LU2606", "LU2607", "LU2608", "LU2609"] # 低硫燃油 # }, # "DCE": { # 大连商品交易所 # "JM": ["JM2603", "JM2604", "JM2605", "JM2606", "JM2607", "JM2608", "JM2609"], # 焦煤 # "P": ["P2603", "P2604", "P2605", "P2606", "P2607", "P2608", "P2609"], # 棕榈油 # "V": ["V2603", "V2604", "V2605", "V2606", "V2607", "V2608", "V2609"], # PVC # "MA": ["MA2603", "MA2604", "MA2605", "MA2606", "MA2607", "MA2608", "MA2609"], # 甲醇 # "BR": ["BR2603", "BR2604", "BR2605", "BR2606", "BR2607", "BR2608", "BR2609"] # 合成橡胶 # }, # "CZCE": { # 郑州商品交易所 # "FG": ["FG2603", "FG2604", "FG2605", "FG2606", "FG2607", "FG2608", "FG2609"], # 玻璃 # "MA": ["MA2603", "MA2604", "MA2605", "MA2606", "MA2607", "MA2608", "MA2609"], # 甲醇 # "V": ["V2603", "V2604", "V2605", "V2606", "V2607", "V2608", "V2609"], # PVC # "SA": ["SA2603", "SA2604", "SA2605", "SA2606", "SA2607", "SA2608", "SA2609"], # 纯碱 # "LY": ["LY2603", "LY2604", "LY2605", "LY2606", "LY2607", "LY2608", "LY2609"] # 烧碱 # }, # "CFFEX": { # 中国金融期货交易所 # "IH": ["IH2603", "IH2604", "IH2605", "IH2606", "IH2607", "IH2608", "IH2609"], # 上证50 # "IC": ["IC2603", "IC2604", "IC2605", "IC2606", "IC2607", "IC2608", "IC2609"], # 中证500 # "IM": ["IM2603", "IM2604", "IM2605", "IM2606", "IM2607", "IM2608", "IM2609"] # 中证1000 # }, # "GEM": { # 广州期货交易所 # "SI": ["SI2603", "SI2604", "SI2605", "SI2606", "SI2607", "SI2608", "SI2609"], # 工业硅 # "SP": ["SP2603", "SP2604", "SP2605", "SP2606", "SP2607", "SP2608", "SP2609"] # 多晶硅 # } } return symbols_by_exchange def get_contract_months(self, product_code: str) -> List[str]: """获取合约的所有月份 Args: product_code: 品种代码,如 "CU" Returns: List[str]: 该品种的所有合约月份列表 """ # 本地枚举的合约月份 contract_months = ["2603", "2604", "2605", "2606", "2607", "2608", "2609"] # 生成完整的合约代码 return [f"{product_code}{month}" for month in contract_months] def get_contracts(self, exchange: str = '', symbol: str = '') -> List[Dict]: """获取合约列表 Args: exchange: 交易所代码,如 'SHFE' symbol: 品种代码,如 'CU' Returns: List[Dict]: 合约列表,每个合约包含代码、名称等信息 """ try: # 获取所有品种按交易所划分 symbols_by_exchange = self.get_all_symbols_by_exchange() contracts = [] # 遍历交易所 for exch, products in symbols_by_exchange.items(): # 如果指定了交易所,只处理该交易所 if exchange and exch != exchange: continue # 遍历品种 for product, product_contracts in products.items(): # 如果指定了品种,只处理该品种 if symbol and product != symbol: continue # 获取品种中文名称 product_name = self.get_product_name_cn(product) # 遍历合约 for contract in product_contracts: contracts.append({ 'symbol': contract, 'product': product, 'product_name': product_name, 'exchange': exch, 'month': contract[-4:] }) return contracts except Exception as e: print(f"获取合约列表失败:{e}") # 返回模拟数据 return [ {'symbol': 'CU2603', 'product': 'CU', 'product_name': '铜', 'exchange': 'SHFE', 'month': '2603'}, {'symbol': 'AL2603', 'product': 'AL', 'product_name': '铝', 'exchange': 'SHFE', 'month': '2603'}, {'symbol': 'ZN2603', 'product': 'ZN', 'product_name': '锌', 'exchange': 'SHFE', 'month': '2603'} ] def get_main_contracts(self) -> Dict[str, str]: """获取主力合约 Returns: Dict[str, str]: 品种代码到主力合约代码的映射 """ try: # 使用适配器的get_main_contracts方法 result = self.adapter.get_main_contracts() if result: return result else: # 如果适配器返回空,使用模拟数据 print("使用模拟主力合约数据") return self._get_mock_main_contracts() except Exception as e: print(f"获取主力合约失败:{e}") return self._get_mock_main_contracts() def _get_mock_main_contracts(self) -> Dict[str, str]: """获取模拟主力合约数据""" # 模拟主力合约数据 return { 'AU': 'AU2603', # 黄金 'AG': 'AG2603', # 白银 'CU': 'CU2603', # 铜 'NI': 'NI2603', # 镍 'SN': 'SN2603' # 锡 } # 导入numpy import numpy as np