You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

440 lines
18 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 数据获取模块
import os
import time
import pandas as pd
from typing import Dict, Optional, List
from qihuo_analyzer.utils.config_manager import config_manager
from qihuo_analyzer.data.api_adapters import DataAdapterFactory
class DataFetcher:
"""数据获取器"""
def __init__(self):
# 使用适配器工厂创建数据适配器
self.adapter = DataAdapterFactory.create_adapter()
self.api_connected = False
def connect(self) -> bool:
"""连接API"""
try:
# 使用适配器的connect方法
success = self.adapter.connect()
self.api_connected = success
return success
except Exception as e:
print(f"API连接失败{e}")
self.api_connected = False
return False
def disconnect(self):
"""断开连接"""
if self.api_connected:
try:
# 使用适配器的disconnect方法
self.adapter.disconnect()
self.api_connected = False
except:
pass
def get_product_name_cn(self, symbol: str) -> str:
"""获取合约的中文名称
Args:
symbol: 合约代码,如 'CU2603'
Returns:
合约的中文名称,如 ''
"""
# 品种中文名称映射
product_name_map = {
'CU': '',
'AL': '',
'ZN': '',
'PB': '',
'NI': '',
'SN': '',
'AU': '黄金',
'AG': '白银',
'RB': '螺纹钢',
'HC': '热轧卷板',
'BU': '沥青',
'RU': '橡胶',
'FU': '燃油',
'SC': '原油',
'I': '铁矿石',
'J': '焦炭',
'JM': '焦煤',
'A': '大豆',
'B': '豆粕',
'M': '豆粕',
'Y': '豆油',
'P': '棕榈油',
'C': '玉米',
'CS': '玉米淀粉',
'L': '聚乙烯',
'V': '聚氯乙烯',
'PP': '聚丙烯',
'TA': 'PTA',
'CF': '棉花',
'SR': '白糖',
'MA': '甲醇',
'ZC': '动力煤',
'FG': '玻璃',
'RM': '菜籽粕',
'OI': '菜籽油',
'RS': '菜籽',
'WH': '强麦',
'JR': '粳稻',
'LR': '晚籼稻',
}
if len(symbol) >= 2:
product_code = symbol[:2].upper()
return product_name_map.get(product_code, product_code)
else:
return symbol
def get_kline_data(self, symbol: str, duration: str, count: int = 200) -> Optional[pd.DataFrame]:
"""获取K线数据
Args:
symbol: 合约代码
duration: 时间周期,如 '1m', '5m', '15m', '1h', '1d'
count: 数据数量
Returns:
K线数据DataFrame如果无法获取真实数据则返回模拟数据
"""
try:
# 使用适配器的get_kline_data方法
result = self.adapter.get_kline_data(symbol, duration, count)
if result is None:
# 如果适配器返回None使用模拟数据
print("适配器返回None使用模拟K线数据")
return self._get_mock_kline_data(symbol, duration, count)
return result
except Exception as e:
print(f"获取K线数据失败{e}")
return self._get_mock_kline_data(symbol, duration, count)
def get_tick_data(self, symbol: str, count: int = 1000) -> Optional[pd.DataFrame]:
"""获取Tick数据"""
try:
# 使用适配器的get_tick_data方法
result = self.adapter.get_tick_data(symbol, count)
if result is None:
# 如果适配器返回None使用模拟数据
return self._get_mock_tick_data(symbol, count)
return result
except Exception as e:
print(f"获取Tick数据失败{e}")
return self._get_mock_tick_data(symbol, count)
def get_contract_info(self, symbol: str) -> Optional[Dict]:
"""获取合约信息"""
try:
# 使用适配器的get_contract_info方法
result = self.adapter.get_contract_info(symbol)
if result is None:
# 如果适配器返回None使用模拟数据
return self._get_mock_contract_info(symbol)
return result
except Exception as e:
print(f"获取合约信息失败:{e}")
return self._get_mock_contract_info(symbol)
def get_market_data(self, symbols: List[str]) -> Dict[str, Dict]:
"""批量获取市场数据"""
try:
# 使用适配器的get_market_data方法
result = self.adapter.get_market_data(symbols)
if result:
return result
else:
# 如果适配器返回空,使用模拟数据
market_data = {}
for symbol in symbols:
market_data[symbol] = self._get_mock_market_data(symbol)
return market_data
except Exception as e:
print(f"获取市场数据失败:{e}")
# 使用模拟数据
market_data = {}
for symbol in symbols:
market_data[symbol] = self._get_mock_market_data(symbol)
return market_data
def _get_mock_kline_data(self, symbol: str, duration: str, count: int) -> pd.DataFrame:
"""获取模拟K线数据"""
# 生成时间序列
end_time = pd.Timestamp.now()
if duration == '1m':
freq = '1T'
elif duration == '5m':
freq = '5T'
elif duration == '15m':
freq = '15T'
elif duration == '30m':
freq = '30T'
elif duration == '1h':
freq = '1H'
elif duration == '1d':
freq = '1D'
else:
freq = '1H'
datetime_index = pd.date_range(end=end_time, periods=count, freq=freq)
# 生成随机价格数据
base_price = 3500
price_changes = np.random.normal(0, 5, count)
prices = base_price + np.cumsum(price_changes)
# 生成其他数据
opens = prices * (1 + np.random.normal(0, 0.001, count))
highs = np.maximum(prices, opens) * (1 + np.random.normal(0, 0.002, count))
lows = np.minimum(prices, opens) * (1 - np.random.normal(0, 0.002, count))
volumes = np.random.randint(1000, 10000, count)
open_interests = np.random.randint(10000, 100000, count)
# 创建DataFrame
df = pd.DataFrame({
'open': opens,
'high': highs,
'low': lows,
'close': prices,
'volume': volumes,
'open_interest': open_interests
}, index=datetime_index)
return df
def _get_mock_tick_data(self, symbol: str, count: int) -> pd.DataFrame:
"""获取模拟Tick数据"""
# 生成时间序列
end_time = pd.Timestamp.now()
datetime_index = pd.date_range(end=end_time, periods=count, freq='1S')
# 生成随机价格数据
base_price = 3500
price_changes = np.random.normal(0, 0.5, count)
last_prices = base_price + np.cumsum(price_changes)
# 生成其他数据
volumes = np.random.randint(10, 100, count)
open_interests = np.random.randint(10000, 100000, count)
bid_prices = last_prices * (1 - np.random.normal(0, 0.0005, count))
ask_prices = last_prices * (1 + np.random.normal(0, 0.0005, count))
bid_volumes = np.random.randint(10, 50, count)
ask_volumes = np.random.randint(10, 50, count)
# 创建DataFrame
df = pd.DataFrame({
'last_price': last_prices,
'volume': volumes,
'open_interest': open_interests,
'bid_price1': bid_prices,
'bid_volume1': bid_volumes,
'ask_price1': ask_prices,
'ask_volume1': ask_volumes
}, index=datetime_index)
return df
def _get_mock_contract_info(self, symbol: str) -> Dict:
"""获取模拟合约信息"""
return {
'symbol': symbol,
'name': symbol,
'exchange': 'SHFE',
'product': symbol[:2],
'price_tick': 1,
'volume_multiple': 10,
'margin_rate': 0.1,
'expire_datetime': int(time.time() * 1e9) + 90 * 24 * 3600 * 1e9,
'create_datetime': int(time.time() * 1e9) - 180 * 24 * 3600 * 1e9
}
def _get_mock_market_data(self, symbol: str) -> Dict:
"""获取模拟市场数据"""
base_price = 3500
return {
'latest_price': base_price + np.random.normal(0, 10),
'open': base_price,
'high': base_price + 20,
'low': base_price - 20,
'pre_close': base_price,
'volume': np.random.randint(10000, 100000),
'open_interest': np.random.randint(100000, 1000000),
'bid_price1': base_price - 1,
'ask_price1': base_price + 1
}
def get_all_symbols(self) -> List[str]:
"""获取所有品种列表
Returns:
List[str]: 所有品种的合约代码列表
"""
try:
# 使用适配器的get_all_symbols方法
result = self.adapter.get_all_symbols()
if result:
return result
else:
# 如果适配器返回空,使用本地枚举数据
print("使用本地枚举品种列表")
symbols_by_exchange = self.get_all_symbols_by_exchange()
symbols = []
for exchange, products in symbols_by_exchange.items():
for product, product_contracts in products.items():
# 使用每个品种的第一个合约作为代表
if product_contracts:
symbols.append(product_contracts[0])
return symbols
except Exception as e:
print(f"获取所有品种列表失败:{e}")
return self._get_mock_all_symbols()
def _get_mock_all_symbols(self) -> List[str]:
"""获取模拟品种列表"""
# 返回用户指定的所有期货品种
return [
"AU2603", "AG2603", "CU2603", "NI2603", "SN2603", "FG2603",
"LY2603", "SA2603", "JM2603", "RB2603", "ALO2603", "MA2603",
"V2603", "FU2603", "SC2603", "AL2603", "P2603", "LI2603",
"SI2603", "RU2603", "BR2603", "ZN2603", "NR2603", "SP2603",
"IM2603", "IC2603", "LU2603", "IH2603"
]
def get_all_symbols_by_exchange(self) -> Dict[str, Dict[str, List[str]]]:
"""获取所有品种列表,按交易所-合约划分
Returns:
Dict[str, Dict[str, List[str]]]: 按交易所-合约划分的品种列表
"""
# 本地枚举数据,按交易所-合约划分
symbols_by_exchange = {
"SHFE": { # 上海期货交易所
"AU": ["AU2603", "AU2604", "AU2605", "AU2606", "AU2607", "AU2608", "AU2609"], # 黄金
"AG": ["AG2603", "AG2604", "AG2605", "AG2606", "AG2607", "AG2608", "AG2609"], # 白银
"CU": ["CU2603", "CU2604", "CU2605", "CU2606", "CU2607", "CU2608", "CU2609"]#, # 铜
# "NI": ["NI2603", "NI2604", "NI2605", "NI2606", "NI2607", "NI2608", "NI2609"], # 镍
# "SN": ["SN2603", "SN2604", "SN2605", "SN2606", "SN2607", "SN2608", "SN2609"], # 锡
# "FG": ["FG2603", "FG2604", "FG2605", "FG2606", "FG2607", "FG2608", "FG2609"], # 玻璃
# "RB": ["RB2603", "RB2604", "RB2605", "RB2606", "RB2607", "RB2608", "RB2609"], # 螺纹钢
# "AL": ["AL2603", "AL2604", "AL2605", "AL2606", "AL2607", "AL2608", "AL2609"], # 铝
# "ZN": ["ZN2603", "ZN2604", "ZN2605", "ZN2606", "ZN2607", "ZN2608", "ZN2609"], # 锌
# "RU": ["RU2603", "RU2604", "RU2605", "RU2606", "RU2607", "RU2608", "RU2609"], # 橡胶
# "NR": ["NR2603", "NR2604", "NR2605", "NR2606", "NR2607", "NR2608", "NR2609"], # 20号胶
# "FU": ["FU2603", "FU2604", "FU2605", "FU2606", "FU2607", "FU2608", "FU2609"], # 燃油
# "SC": ["SC2603", "SC2604", "SC2605", "SC2606", "SC2607", "SC2608", "SC2609"], # 原油
# "LU": ["LU2603", "LU2604", "LU2605", "LU2606", "LU2607", "LU2608", "LU2609"], # 低硫燃油
# "ALO": ["ALO2603", "ALO2604", "ALO2605", "ALO2606", "ALO2607", "ALO2608", "ALO2609"], # 氧化铝
# "LI": ["LI2603", "LI2604", "LI2605", "LI2606", "LI2607", "LI2608", "LI2609"], # 碳酸锂
# "SI": ["SI2603", "SI2604", "SI2605", "SI2606", "SI2607", "SI2608", "SI2609"] # 工业硅
}#,
# "INE": { # 上海国际能源交易中心
# "SC": ["SC2603", "SC2604", "SC2605", "SC2606", "SC2607", "SC2608", "SC2609"], # 原油
# "LU": ["LU2603", "LU2604", "LU2605", "LU2606", "LU2607", "LU2608", "LU2609"] # 低硫燃油
# },
# "DCE": { # 大连商品交易所
# "JM": ["JM2603", "JM2604", "JM2605", "JM2606", "JM2607", "JM2608", "JM2609"], # 焦煤
# "P": ["P2603", "P2604", "P2605", "P2606", "P2607", "P2608", "P2609"], # 棕榈油
# "V": ["V2603", "V2604", "V2605", "V2606", "V2607", "V2608", "V2609"], # PVC
# "MA": ["MA2603", "MA2604", "MA2605", "MA2606", "MA2607", "MA2608", "MA2609"], # 甲醇
# "BR": ["BR2603", "BR2604", "BR2605", "BR2606", "BR2607", "BR2608", "BR2609"] # 合成橡胶
# },
# "CZCE": { # 郑州商品交易所
# "FG": ["FG2603", "FG2604", "FG2605", "FG2606", "FG2607", "FG2608", "FG2609"], # 玻璃
# "MA": ["MA2603", "MA2604", "MA2605", "MA2606", "MA2607", "MA2608", "MA2609"], # 甲醇
# "V": ["V2603", "V2604", "V2605", "V2606", "V2607", "V2608", "V2609"], # PVC
# "SA": ["SA2603", "SA2604", "SA2605", "SA2606", "SA2607", "SA2608", "SA2609"], # 纯碱
# "LY": ["LY2603", "LY2604", "LY2605", "LY2606", "LY2607", "LY2608", "LY2609"] # 烧碱
# },
# "CFFEX": { # 中国金融期货交易所
# "IH": ["IH2603", "IH2604", "IH2605", "IH2606", "IH2607", "IH2608", "IH2609"], # 上证50
# "IC": ["IC2603", "IC2604", "IC2605", "IC2606", "IC2607", "IC2608", "IC2609"], # 中证500
# "IM": ["IM2603", "IM2604", "IM2605", "IM2606", "IM2607", "IM2608", "IM2609"] # 中证1000
# },
# "GEM": { # 广州期货交易所
# "SI": ["SI2603", "SI2604", "SI2605", "SI2606", "SI2607", "SI2608", "SI2609"], # 工业硅
# "SP": ["SP2603", "SP2604", "SP2605", "SP2606", "SP2607", "SP2608", "SP2609"] # 多晶硅
# }
}
return symbols_by_exchange
def get_contract_months(self, product_code: str) -> List[str]:
"""获取合约的所有月份
Args:
product_code: 品种代码,如 "CU"
Returns:
List[str]: 该品种的所有合约月份列表
"""
# 本地枚举的合约月份
contract_months = ["2603", "2604", "2605", "2606", "2607", "2608", "2609"]
# 生成完整的合约代码
return [f"{product_code}{month}" for month in contract_months]
def get_contracts(self, exchange: str = '', symbol: str = '') -> List[Dict]:
"""获取合约列表
Args:
exchange: 交易所代码,如 'SHFE'
symbol: 品种代码,如 'CU'
Returns:
List[Dict]: 合约列表,每个合约包含代码、名称等信息
"""
try:
# 获取所有品种按交易所划分
symbols_by_exchange = self.get_all_symbols_by_exchange()
contracts = []
# 遍历交易所
for exch, products in symbols_by_exchange.items():
# 如果指定了交易所,只处理该交易所
if exchange and exch != exchange:
continue
# 遍历品种
for product, product_contracts in products.items():
# 如果指定了品种,只处理该品种
if symbol and product != symbol:
continue
# 获取品种中文名称
product_name = self.get_product_name_cn(product)
# 遍历合约
for contract in product_contracts:
contracts.append({
'symbol': contract,
'product': product,
'product_name': product_name,
'exchange': exch,
'month': contract[-4:]
})
return contracts
except Exception as e:
print(f"获取合约列表失败:{e}")
# 返回模拟数据
return [
{'symbol': 'CU2603', 'product': 'CU', 'product_name': '', 'exchange': 'SHFE', 'month': '2603'},
{'symbol': 'AL2603', 'product': 'AL', 'product_name': '', 'exchange': 'SHFE', 'month': '2603'},
{'symbol': 'ZN2603', 'product': 'ZN', 'product_name': '', 'exchange': 'SHFE', 'month': '2603'}
]
# 导入numpy
import numpy as np