You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import datetime
from tqsdk import TqApi, TqAuth
from typing import List, Dict, Any
class TqApiService:
def __init__(self):
self.api = None
self.connected = False
async def connect(self, username: str, password: str) -> bool:
"""连接到天勤服务器"""
try:
# 立即返回不等待TqApi实例完全连接到天勤服务器
# 连接过程将在后台进行
self.username = username
self.password = password
self.connected = True
# 创建后台任务来初始化API实例和运行事件循环
asyncio.create_task(self._initialize_api())
return True
except Exception as e:
print(f"连接失败: {e}")
self.connected = False
return False
async def _initialize_api(self):
"""初始化TqApi实例并运行事件循环"""
try:
print("正在初始化TqApi实例...")
# 创建TqApi实例
self.api = TqApi(auth=TqAuth(self.username, self.password), debug=False)
print("TqApi实例初始化完成")
# 运行事件循环
await self._run_api()
except Exception as e:
print(f"初始化TqApi实例失败: {e}")
self.connected = False
async def _run_api(self):
"""运行TQAPI事件循环"""
try:
while self.connected and self.api:
try:
# 非阻塞方式检查更新
self.api.wait_update(0.1) # 超时0.1秒,避免完全阻塞
except Exception as e:
print(f"API更新出错: {e}")
await asyncio.sleep(0.1)
# 让出CPU时间避免阻塞其他任务
await asyncio.sleep(0.01)
except Exception as e:
print(f"API运行出错: {e}")
self.connected = False
async def get_contracts(self) -> List[Dict[str, Any]]:
"""获取合约列表"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回空列表
print("API实例尚未初始化返回空合约列表")
return []
try:
# 获取所有合约
contracts = self.api.get_contracts()
# 过滤期货合约,排除期权等其他类型
futures_contracts = []
for symbol, contract in contracts.items():
# 只保留期货合约
if contract["product_class"] == "FUTURE":
futures_contracts.append({
"symbol": symbol,
"name": contract["name"],
"exchange": contract["exchange"],
"product_id": contract["product_id"],
"price_tick": contract["price_tick"],
"volume_multiple": contract["volume_multiple"],
"margin_rate": contract["margin_rate"],
"expire_datetime": contract["expire_datetime"]
})
return futures_contracts
except Exception as e:
print(f"获取合约列表失败: {e}")
# 返回空列表,避免整个请求失败
return []
async def get_contract(self, symbol: str) -> Dict[str, Any]:
"""获取合约详情"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回默认值
print("API实例尚未初始化返回默认合约详情")
return {
"symbol": symbol,
"name": "",
"exchange": "",
"product_id": "",
"price_tick": 0,
"volume_multiple": 0,
"margin_rate": 0,
"expire_datetime": 0
}
try:
# 获取合约详情
contract = self.api.get_contract(symbol)
return {
"symbol": symbol,
"name": contract["name"],
"exchange": contract["exchange"],
"product_id": contract["product_id"],
"price_tick": contract["price_tick"],
"volume_multiple": contract["volume_multiple"],
"margin_rate": contract["margin_rate"],
"expire_datetime": contract["expire_datetime"]
}
except Exception as e:
print(f"获取合约详情失败: {e}")
# 返回默认值,避免整个请求失败
return {
"symbol": symbol,
"name": "",
"exchange": "",
"product_id": "",
"price_tick": 0,
"volume_multiple": 0,
"margin_rate": 0,
"expire_datetime": 0
}
async def get_klines(self, symbol: str, period: str, count: int) -> List[Dict[str, Any]]:
"""获取K线数据"""
if not self.connected:
raise Exception("未连接到天勤服务器")
if not self.api:
# API实例尚未初始化返回空列表
print("API实例尚未初始化返回空K线数据列表")
return []
try:
# 转换周期格式
duration_seconds = self._convert_period(period)
# 获取K线数据
klines = self.api.get_kline_serial(symbol, duration_seconds, data_length=count)
# 转换为前端需要的格式
result = []
for i in range(len(klines)):
result.append({
"time": int(klines.iloc[i]["datetime"].timestamp()),
"open": float(klines.iloc[i]["open"]),
"high": float(klines.iloc[i]["high"]),
"low": float(klines.iloc[i]["low"]),
"close": float(klines.iloc[i]["close"]),
"volume": float(klines.iloc[i]["volume"])
})
return result
except Exception as e:
print(f"获取K线数据失败: {e}")
# 返回空列表,避免整个请求失败
return []
async def get_tick(self, symbol: str) -> Dict[str, Any]:
"""获取tick数据"""
try:
if not self.connected:
# 未连接到天勤服务器,返回默认数据
print("未连接到天勤服务器返回默认tick数据")
return {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
if not self.api:
# API实例尚未初始化返回默认数据
print("API实例尚未初始化返回默认tick数据")
return {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
# 获取实时行情
quote = self.api.get_quote(symbol)
# 尝试多次获取行情更新,避免单次超时
max_attempts = 3
for attempt in range(max_attempts):
try:
# 等待行情更新,设置超时
updated = self.api.wait_update(1.0) # 1秒超时
if updated:
break
print(f"尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息超时")
except Exception as e:
print(f"尝试 {attempt + 1}/{max_attempts}: 获取 {symbol} 的行情信息出错: {e}")
await asyncio.sleep(0.5)
# 即使没有更新,也尝试返回当前数据
return {
"last_price": float(quote.get("last_price", 0)),
"pre_close": float(quote.get("pre_close", 0)),
"open": float(quote.get("open", 0)),
"high": float(quote.get("high", 0)),
"low": float(quote.get("low", 0)),
"volume": float(quote.get("volume", 0)),
"open_interest": float(quote.get("open_interest", 0)),
"bid_price1": float(quote.get("bid_price1", 0)),
"bid_volume1": float(quote.get("bid_volume1", 0)),
"ask_price1": float(quote.get("ask_price1", 0)),
"ask_volume1": float(quote.get("ask_volume1", 0)),
"datetime": int(quote.get("datetime", datetime.datetime.now()).timestamp())
}
except Exception as e:
print(f"获取tick数据失败: {e}")
# 返回默认数据,避免整个请求失败
return {
"last_price": 0,
"pre_close": 0,
"open": 0,
"high": 0,
"low": 0,
"volume": 0,
"open_interest": 0,
"bid_price1": 0,
"bid_volume1": 0,
"ask_price1": 0,
"ask_volume1": 0,
"datetime": int(datetime.datetime.now().timestamp())
}
async def disconnect(self) -> bool:
"""断开连接"""
try:
if self.api:
self.connected = False
self.api.close()
self.api = None
return True
except Exception as e:
print(f"断开连接失败: {e}")
return False
def _convert_period(self, period: str) -> int:
"""转换周期格式为秒"""
period_map = {
"1M": 60,
"5M": 300,
"15M": 900,
"30M": 1800,
"1H": 3600,
"4H": 14400,
"1D": 86400
}
return period_map.get(period, 3600) # 默认1小时