You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
13 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import datetime
from tqsdk import TqApi, TqAuth
from typing import List, Dict, Any
class TqApiService:
# 类级缓存存储用户名密码对应的TqApiService实例
_instance_cache = {}
def __init__(self):
self.api = None
self.connected = False
self.username = None
self.password = None
@classmethod
async def get_instance(cls, username: str, password: str):
"""获取或创建TqApiService实例"""
# 生成缓存键
cache_key = f"{username}:{password}"
# 检查缓存中是否存在实例
if cache_key in cls._instance_cache:
instance = cls._instance_cache[cache_key]
# 检查实例是否仍然连接
if instance.connected:
print(f"使用缓存的TqApiService实例 for {username}")
return instance
else:
# 实例已断开连接,从缓存中移除
print(f"TqApiService实例 for {username} 已断开连接,从缓存中移除")
del cls._instance_cache[cache_key]
# 创建新实例
print(f"创建新的TqApiService实例 for {username}")
instance = cls()
instance.username = username
instance.password = password
# 连接到天勤服务器
await instance.connect(username, password)
# 存入缓存
cls._instance_cache[cache_key] = instance
return instance
async def connect(self, username: str, password: str) -> bool:
"""连接到天勤服务器"""
try:
# 立即返回不等待TqApi实例完全连接到天勤服务器
# 连接过程将在后台进行
self.username = username
self.password = password
self.connected = True
# 创建后台任务来初始化API实例和运行事件循环
asyncio.create_task(self._initialize_api())
return True
except Exception as e:
print(f"连接失败: {e}")
self.connected = False
return False
async def _initialize_api(self):
"""初始化TqApi实例并运行事件循环"""
try:
print("正在初始化TqApi实例...")
# 创建TqApi实例
self.api = TqApi(auth=TqAuth(self.username, self.password), debug=False)
print("TqApi实例初始化完成")
# 运行事件循环
await self._run_api()
except Exception as e:
print(f"初始化TqApi实例失败: {e}")
self.connected = False
async def _run_api(self):
"""运行TQAPI事件循环"""
try:
while self.connected and self.api:
try:
# 非阻塞方式检查更新
self.api.wait_update(0.1) # 超时0.1秒,避免完全阻塞
except Exception as e:
print(f"API更新出错: {e}")
await asyncio.sleep(0.1)
# 让出CPU时间避免阻塞其他任务
await asyncio.sleep(0.01)
except Exception as e:
print(f"API运行出错: {e}")
self.connected = False
async def get_contracts(self) -> List[Dict[str, Any]]:
"""获取合约列表"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回空列表
print("API实例尚未初始化返回空合约列表")
return []
try:
# 获取所有合约
contracts = self.api.get_contracts()
# 过滤期货合约,排除期权等其他类型
futures_contracts = []
for symbol, contract in contracts.items():
# 只保留期货合约
if contract["product_class"] == "FUTURE":
futures_contracts.append({
"symbol": symbol,
"name": contract["name"],
"exchange": contract["exchange"],
"product_id": contract["product_id"],
"price_tick": contract["price_tick"],
"volume_multiple": contract["volume_multiple"],
"margin_rate": contract["margin_rate"],
"expire_datetime": contract["expire_datetime"]
})
return futures_contracts
except Exception as e:
print(f"获取合约列表失败: {e}")
# 返回空列表,避免整个请求失败
return []
async def get_contract(self, symbol: str) -> Dict[str, Any]:
"""获取合约详情"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回默认值
print("API实例尚未初始化返回默认合约详情")
return {
"symbol": symbol,
"name": "",
"exchange": "",
"product_id": "",
"price_tick": 0,
"volume_multiple": 0,
"margin_rate": 0,
"expire_datetime": 0
}
try:
# 获取合约详情
contract = self.api.get_contract(symbol)
return {
"symbol": symbol,
"name": contract["name"],
"exchange": contract["exchange"],
"product_id": contract["product_id"],
"price_tick": contract["price_tick"],
"volume_multiple": contract["volume_multiple"],
"margin_rate": contract["margin_rate"],
"expire_datetime": contract["expire_datetime"]
}
except Exception as e:
print(f"获取合约详情失败: {e}")
# 返回默认值,避免整个请求失败
return {
"symbol": symbol,
"name": "",
"exchange": "",
"product_id": "",
"price_tick": 0,
"volume_multiple": 0,
"margin_rate": 0,
"expire_datetime": 0
}
async def get_klines(self, symbol: str, period: str, count: int) -> List[Dict[str, Any]]:
"""获取K线数据"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回空列表
print("API实例尚未初始化返回空K线数据列表")
return []
try:
# 转换周期格式
duration_seconds = self._convert_period(period)
# 获取K线数据
klines = self.api.get_kline_serial(symbol, duration_seconds, data_length=count)
# 转换为前端需要的格式
result = []
for i in range(len(klines)):
result.append({
"time": int(klines.iloc[i]["datetime"].timestamp()),
"open": float(klines.iloc[i]["open"]),
"high": float(klines.iloc[i]["high"]),
"low": float(klines.iloc[i]["low"]),
"close": float(klines.iloc[i]["close"]),
"volume": float(klines.iloc[i]["volume"])
})
return result
except Exception as e:
print(f"获取K线数据失败: {e}")
# 返回空列表,避免整个请求失败
return []
async def get_tick(self, symbol: str) -> Dict[str, Any]:
"""获取tick数据"""
print(f"[TqApiService] 开始获取 tick 数据symbol: {symbol}")
try:
print(f"[TqApiService] 检查连接状态: {self.connected}")
if not self.connected:
# 未连接到天勤服务器,返回默认数据
print("[TqApiService] 未连接到天勤服务器返回默认tick数据")
return {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
print(f"[TqApiService] 检查API实例状态: {self.api is not None}")
if not self.api:
# API实例尚未初始化返回默认数据
print("[TqApiService] API实例尚未初始化返回默认tick数据")
return {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
# 获取实时行情
print(f"[TqApiService] 正在调用 self.api.get_quote({symbol})...")
quote = self.api.get_quote(symbol)
print(f"[TqApiService] 获取 quote 对象成功")
# 尝试多次获取行情更新,避免单次超时
max_attempts = 3
print(f"[TqApiService] 开始尝试获取行情更新,最多 {max_attempts}")
for attempt in range(max_attempts):
try:
# 等待行情更新,设置超时
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 调用 self.api.wait_update(1.0)...")
updated = self.api.wait_update(1.0) # 1秒超时
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: wait_update 返回: {updated}")
if updated:
print(f"[TqApiService] 获取行情更新成功")
break
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息超时")
except Exception as e:
print(f"[TqApiService] 尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息出错: {e}")
await asyncio.sleep(0.5)
# 即使没有更新,也尝试返回当前数据
print(f"[TqApiService] 正在构建返回数据...")
result = {
"last_price": float(quote.get("last_price", 0)),
"pre_close": float(quote.get("pre_close", 0)),
"open": float(quote.get("open", 0)),
"high": float(quote.get("high", 0)),
"low": float(quote.get("low", 0)),
"volume": float(quote.get("volume", 0)),
"open_interest": float(quote.get("open_interest", 0)),
"bid_price1": float(quote.get("bid_price1", 0)),
"bid_volume1": float(quote.get("bid_volume1", 0)),
"ask_price1": float(quote.get("ask_price1", 0)),
"ask_volume1": float(quote.get("ask_volume1", 0)),
"datetime": int(quote.get("datetime", datetime.datetime.now()).timestamp())
}
print(f"[TqApiService] 获取 tick 数据成功,返回: {result}")
return result
except Exception as e:
print(f"[TqApiService] 获取tick数据失败: {e}")
import traceback
traceback.print_exc()
# 返回默认数据,避免整个请求失败
default_result = {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
print(f"[TqApiService] 返回默认数据: {default_result}")
return default_result
async def disconnect(self) -> bool:
"""断开连接"""
try:
if self.api:
self.connected = False
self.api.close()
self.api = None
return True
except Exception as e:
print(f"断开连接失败: {e}")
return False
def _convert_period(self, period: str) -> int:
"""转换周期格式为秒"""
period_map = {
"1M": 60,
"5M": 300,
"15M": 900,
"30M": 1800,
"1H": 3600,
"4H": 14400,
"1D": 86400
}
return period_map.get(period, 3600) # 默认1小时