You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
13 KiB

import asyncio
import datetime
from tqsdk import TqApi, TqAuth
from typing import List, Dict, Any
class TqApiService:
# 类级缓存存储用户名密码对应的TqApiService实例
_instance_cache = {}
def __init__(self):
self.api = None
self.connected = False
self.username = None
self.password = None
@classmethod
async def get_instance(cls, username: str, password: str):
"""获取或创建TqApiService实例"""
# 生成缓存键
cache_key = f"{username}:{password}"
# 检查缓存中是否存在实例
if cache_key in cls._instance_cache:
instance = cls._instance_cache[cache_key]
# 检查实例是否仍然连接
if instance.connected:
print(f"使用缓存的TqApiService实例 for {username}")
return instance
else:
# 实例已断开连接,从缓存中移除
print(f"TqApiService实例 for {username} 已断开连接,从缓存中移除")
del cls._instance_cache[cache_key]
# 创建新实例
print(f"创建新的TqApiService实例 for {username}")
instance = cls()
instance.username = username
instance.password = password
# 连接到天勤服务器
await instance.connect(username, password)
# 存入缓存
cls._instance_cache[cache_key] = instance
return instance
async def connect(self, username: str, password: str) -> bool:
"""连接到天勤服务器"""
try:
# 立即返回不等待TqApi实例完全连接到天勤服务器
# 连接过程将在后台进行
self.username = username
self.password = password
self.connected = True
# 创建后台任务来初始化API实例和运行事件循环
asyncio.create_task(self._initialize_api())
return True
except Exception as e:
print(f"连接失败: {e}")
self.connected = False
return False
async def _initialize_api(self):
"""初始化TqApi实例并运行事件循环"""
try:
print("正在初始化TqApi实例...")
# 创建TqApi实例
self.api = TqApi(auth=TqAuth(self.username, self.password), debug=False)
print("TqApi实例初始化完成")
# 运行事件循环
await self._run_api()
except Exception as e:
print(f"初始化TqApi实例失败: {e}")
self.connected = False
async def _run_api(self):
"""运行TQAPI事件循环"""
try:
while self.connected and self.api:
try:
# 非阻塞方式检查更新
self.api.wait_update(0.1) # 超时0.1秒,避免完全阻塞
except Exception as e:
print(f"API更新出错: {e}")
await asyncio.sleep(0.1)
# 让出CPU时间避免阻塞其他任务
await asyncio.sleep(0.01)
except Exception as e:
print(f"API运行出错: {e}")
self.connected = False
async def get_contracts(self) -> List[Dict[str, Any]]:
"""获取合约列表"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回空列表
print("API实例尚未初始化返回空合约列表")
return []
try:
# 获取所有合约
contracts = self.api.get_contracts()
# 过滤期货合约,排除期权等其他类型
futures_contracts = []
for symbol, contract in contracts.items():
# 只保留期货合约
if contract["product_class"] == "FUTURE":
futures_contracts.append({
"symbol": symbol,
"name": contract["name"],
"exchange": contract["exchange"],
"product_id": contract["product_id"],
"price_tick": contract["price_tick"],
"volume_multiple": contract["volume_multiple"],
"margin_rate": contract["margin_rate"],
"expire_datetime": contract["expire_datetime"]
})
return futures_contracts
except Exception as e:
print(f"获取合约列表失败: {e}")
# 返回空列表,避免整个请求失败
return []
async def get_contract(self, symbol: str) -> Dict[str, Any]:
"""获取合约详情"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回默认值
print("API实例尚未初始化返回默认合约详情")
return {
"symbol": symbol,
"name": "",
"exchange": "",
"product_id": "",
"price_tick": 0,
"volume_multiple": 0,
"margin_rate": 0,
"expire_datetime": 0
}
try:
# 获取合约详情
contract = self.api.get_contract(symbol)
return {
"symbol": symbol,
"name": contract["name"],
"exchange": contract["exchange"],
"product_id": contract["product_id"],
"price_tick": contract["price_tick"],
"volume_multiple": contract["volume_multiple"],
"margin_rate": contract["margin_rate"],
"expire_datetime": contract["expire_datetime"]
}
except Exception as e:
print(f"获取合约详情失败: {e}")
# 返回默认值,避免整个请求失败
return {
"symbol": symbol,
"name": "",
"exchange": "",
"product_id": "",
"price_tick": 0,
"volume_multiple": 0,
"margin_rate": 0,
"expire_datetime": 0
}
async def get_klines(self, symbol: str, period: str, count: int) -> List[Dict[str, Any]]:
"""获取K线数据"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回空列表
print("API实例尚未初始化返回空K线数据列表")
return []
try:
# 转换周期格式
duration_seconds = self._convert_period(period)
# 获取K线数据
klines = self.api.get_kline_serial(symbol, duration_seconds, data_length=count)
# 转换为前端需要的格式
result = []
for i in range(len(klines)):
result.append({
"time": int(klines.iloc[i]["datetime"].timestamp()),
"open": float(klines.iloc[i]["open"]),
"high": float(klines.iloc[i]["high"]),
"low": float(klines.iloc[i]["low"]),
"close": float(klines.iloc[i]["close"]),
"volume": float(klines.iloc[i]["volume"])
})
return result
except Exception as e:
print(f"获取K线数据失败: {e}")
# 返回空列表,避免整个请求失败
return []
async def get_tick(self, symbol: str) -> Dict[str, Any]:
"""获取tick数据"""
print(f"[TqApiService] 开始获取 tick 数据symbol: {symbol}")
try:
print(f"[TqApiService] 检查连接状态: {self.connected}")
if not self.connected:
# 未连接到天勤服务器,返回默认数据
print("[TqApiService] 未连接到天勤服务器返回默认tick数据")
return {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
print(f"[TqApiService] 检查API实例状态: {self.api is not None}")
if not self.api:
# API实例尚未初始化返回默认数据
print("[TqApiService] API实例尚未初始化返回默认tick数据")
return {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
# 获取实时行情
print(f"[TqApiService] 正在调用 self.api.get_quote({symbol})...")
quote = self.api.get_quote(symbol)
print(f"[TqApiService] 获取 quote 对象成功")
# 尝试多次获取行情更新,避免单次超时
max_attempts = 3
print(f"[TqApiService] 开始尝试获取行情更新,最多 {max_attempts}")
for attempt in range(max_attempts):
try:
# 等待行情更新,设置超时
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 调用 self.api.wait_update(1.0)...")
updated = self.api.wait_update(1.0) # 1秒超时
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: wait_update 返回: {updated}")
if updated:
print(f"[TqApiService] 获取行情更新成功")
break
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息超时")
except Exception as e:
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息出错: {e}")
await asyncio.sleep(0.5)
# 即使没有更新,也尝试返回当前数据
print(f"[TqApiService] 正在构建返回数据...")
result = {
"last_price": float(quote.get("last_price", 0)),
"pre_close": float(quote.get("pre_close", 0)),
"open": float(quote.get("open", 0)),
"high": float(quote.get("high", 0)),
"low": float(quote.get("low", 0)),
"volume": float(quote.get("volume", 0)),
"open_interest": float(quote.get("open_interest", 0)),
"bid_price1": float(quote.get("bid_price1", 0)),
"bid_volume1": float(quote.get("bid_volume1", 0)),
"ask_price1": float(quote.get("ask_price1", 0)),
"ask_volume1": float(quote.get("ask_volume1", 0)),
"datetime": int(quote.get("datetime", datetime.datetime.now()).timestamp())
}
print(f"[TqApiService] 获取 tick 数据成功,返回: {result}")
return result
except Exception as e:
print(f"[TqApiService] 获取tick数据失败: {e}")
import traceback
traceback.print_exc()
# 返回默认数据,避免整个请求失败
default_result = {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
print(f"[TqApiService] 返回默认数据: {default_result}")
return default_result
async def disconnect(self) -> bool:
"""断开连接"""
try:
if self.api:
self.connected = False
self.api.close()
self.api = None
return True
except Exception as e:
print(f"断开连接失败: {e}")
return False
def _convert_period(self, period: str) -> int:
"""转换周期格式为秒"""
period_map = {
"1M": 60,
"5M": 300,
"15M": 900,
"30M": 1800,
"1H": 3600,
"4H": 14400,
"1D": 86400
}
return period_map.get(period, 3600) # 默认1小时