import asyncio import datetime from tqsdk import TqApi, TqAuth from typing import List, Dict, Any class TqApiService: # 类级缓存,存储用户名密码对应的TqApiService实例 _instance_cache = {} def __init__(self): self.api = None self.connected = False self.username = None self.password = None @classmethod async def get_instance(cls, username: str, password: str): """获取或创建TqApiService实例""" # 生成缓存键 cache_key = f"{username}:{password}" # 检查缓存中是否存在实例 if cache_key in cls._instance_cache: instance = cls._instance_cache[cache_key] # 检查实例是否仍然连接 if instance.connected: print(f"使用缓存的TqApiService实例 for {username}") return instance else: # 实例已断开连接,从缓存中移除 print(f"TqApiService实例 for {username} 已断开连接,从缓存中移除") del cls._instance_cache[cache_key] # 创建新实例 print(f"创建新的TqApiService实例 for {username}") instance = cls() instance.username = username instance.password = password # 连接到天勤服务器 await instance.connect(username, password) # 存入缓存 cls._instance_cache[cache_key] = instance return instance async def connect(self, username: str, password: str) -> bool: """连接到天勤服务器""" try: # 立即返回,不等待TqApi实例完全连接到天勤服务器 # 连接过程将在后台进行 self.username = username self.password = password self.connected = True # 创建后台任务来初始化API实例和运行事件循环 asyncio.create_task(self._initialize_api()) return True except Exception as e: print(f"连接失败: {e}") self.connected = False return False async def _initialize_api(self): """初始化TqApi实例并运行事件循环""" try: print("正在初始化TqApi实例...") # 创建TqApi实例 self.api = TqApi(auth=TqAuth(self.username, self.password), debug=False) print("TqApi实例初始化完成") # 运行事件循环 await self._run_api() except Exception as e: print(f"初始化TqApi实例失败: {e}") self.connected = False async def _run_api(self): """运行TQAPI事件循环""" try: while self.connected and self.api: try: # 非阻塞方式检查更新 self.api.wait_update(0.1) # 超时0.1秒,避免完全阻塞 except Exception as e: print(f"API更新出错: {e}") await asyncio.sleep(0.1) # 让出CPU时间,避免阻塞其他任务 await asyncio.sleep(0.01) except Exception as e: print(f"API运行出错: {e}") self.connected = False async def get_contracts(self) -> List[Dict[str, Any]]: """获取合约列表""" if not self.connected: raise Exception("未连接到天勤服务器") if not self.api: # API实例尚未初始化,返回空列表 print("API实例尚未初始化,返回空合约列表") return [] try: # 获取所有合约 contracts = self.api.get_contracts() # 过滤期货合约,排除期权等其他类型 futures_contracts = [] for symbol, contract in contracts.items(): # 只保留期货合约 if contract["product_class"] == "FUTURE": futures_contracts.append({ "symbol": symbol, "name": contract["name"], "exchange": contract["exchange"], "product_id": contract["product_id"], "price_tick": contract["price_tick"], "volume_multiple": contract["volume_multiple"], "margin_rate": contract["margin_rate"], "expire_datetime": contract["expire_datetime"] }) return futures_contracts except Exception as e: print(f"获取合约列表失败: {e}") # 返回空列表,避免整个请求失败 return [] async def get_contract(self, symbol: str) -> Dict[str, Any]: """获取合约详情""" if not self.connected: raise Exception("未连接到天勤服务器") if not self.api: # API实例尚未初始化,返回默认值 print("API实例尚未初始化,返回默认合约详情") return { "symbol": symbol, "name": "", "exchange": "", "product_id": "", "price_tick": 0, "volume_multiple": 0, "margin_rate": 0, "expire_datetime": 0 } try: # 获取合约详情 contract = self.api.get_contract(symbol) return { "symbol": symbol, "name": contract["name"], "exchange": contract["exchange"], "product_id": contract["product_id"], "price_tick": contract["price_tick"], "volume_multiple": contract["volume_multiple"], "margin_rate": contract["margin_rate"], "expire_datetime": contract["expire_datetime"] } except Exception as e: print(f"获取合约详情失败: {e}") # 返回默认值,避免整个请求失败 return { "symbol": symbol, "name": "", "exchange": "", "product_id": "", "price_tick": 0, "volume_multiple": 0, "margin_rate": 0, "expire_datetime": 0 } async def get_klines(self, symbol: str, period: str, count: int) -> List[Dict[str, Any]]: """获取K线数据""" if not self.connected: raise Exception("未连接到天勤服务器") if not self.api: # API实例尚未初始化,返回空列表 print("API实例尚未初始化,返回空K线数据列表") return [] try: # 转换周期格式 duration_seconds = self._convert_period(period) # 获取K线数据 klines = self.api.get_kline_serial(symbol, duration_seconds, data_length=count) # 转换为前端需要的格式 result = [] for i in range(len(klines)): result.append({ "time": int(klines.iloc[i]["datetime"].timestamp()), "open": float(klines.iloc[i]["open"]), "high": float(klines.iloc[i]["high"]), "low": float(klines.iloc[i]["low"]), "close": float(klines.iloc[i]["close"]), "volume": float(klines.iloc[i]["volume"]) }) return result except Exception as e: print(f"获取K线数据失败: {e}") # 返回空列表,避免整个请求失败 return [] async def get_tick(self, symbol: str) -> Dict[str, Any]: """获取tick数据""" print(f"[TqApiService] 开始获取 tick 数据,symbol: {symbol}") try: print(f"[TqApiService] 检查连接状态: {self.connected}") if not self.connected: # 未连接到天勤服务器,返回默认数据 print("[TqApiService] 未连接到天勤服务器,返回默认tick数据") return { "last_price": 0, "pre_close": 0, "open": 0, "high": 0, "low": 0, "volume": 0, "open_interest": 0, "bid_price1": 0, "bid_volume1": 0, "ask_price1": 0, "ask_volume1": 0, "datetime": int(datetime.datetime.now().timestamp()) } print(f"[TqApiService] 检查API实例状态: {self.api is not None}") if not self.api: # API实例尚未初始化,返回默认数据 print("[TqApiService] API实例尚未初始化,返回默认tick数据") return { "last_price": 0, "pre_close": 0, "open": 0, "high": 0, "low": 0, "volume": 0, "open_interest": 0, "bid_price1": 0, "bid_volume1": 0, "ask_price1": 0, "ask_volume1": 0, "datetime": int(datetime.datetime.now().timestamp()) } # 获取实时行情 print(f"[TqApiService] 正在调用 self.api.get_quote({symbol})...") quote = self.api.get_quote(symbol) print(f"[TqApiService] 获取 quote 对象成功") # 尝试多次获取行情更新,避免单次超时 max_attempts = 3 print(f"[TqApiService] 开始尝试获取行情更新,最多 {max_attempts} 次") for attempt in range(max_attempts): try: # 等待行情更新,设置超时 print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 调用 self.api.wait_update(1.0)...") updated = self.api.wait_update(1.0) # 1秒超时 print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: wait_update 返回: {updated}") if updated: print(f"[TqApiService] 获取行情更新成功") break print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息超时") except Exception as e: print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息出错: {e}") await asyncio.sleep(0.5) # 即使没有更新,也尝试返回当前数据 print(f"[TqApiService] 正在构建返回数据...") result = { "last_price": float(quote.get("last_price", 0)), "pre_close": float(quote.get("pre_close", 0)), "open": float(quote.get("open", 0)), "high": float(quote.get("high", 0)), "low": float(quote.get("low", 0)), "volume": float(quote.get("volume", 0)), "open_interest": float(quote.get("open_interest", 0)), "bid_price1": float(quote.get("bid_price1", 0)), "bid_volume1": float(quote.get("bid_volume1", 0)), "ask_price1": float(quote.get("ask_price1", 0)), "ask_volume1": float(quote.get("ask_volume1", 0)), "datetime": int(quote.get("datetime", datetime.datetime.now()).timestamp()) } print(f"[TqApiService] 获取 tick 数据成功,返回: {result}") return result except Exception as e: print(f"[TqApiService] 获取tick数据失败: {e}") import traceback traceback.print_exc() # 返回默认数据,避免整个请求失败 default_result = { "last_price": 0, "pre_close": 0, "open": 0, "high": 0, "low": 0, "volume": 0, "open_interest": 0, "bid_price1": 0, "bid_volume1": 0, "ask_price1": 0, "ask_volume1": 0, "datetime": int(datetime.datetime.now().timestamp()) } print(f"[TqApiService] 返回默认数据: {default_result}") return default_result async def disconnect(self) -> bool: """断开连接""" try: if self.api: self.connected = False self.api.close() self.api = None return True except Exception as e: print(f"断开连接失败: {e}") return False def _convert_period(self, period: str) -> int: """转换周期格式为秒""" period_map = { "1M": 60, "5M": 300, "15M": 900, "30M": 1800, "1H": 3600, "4H": 14400, "1D": 86400 } return period_map.get(period, 3600) # 默认1小时