You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

77 lines
2.3 KiB

#!/usr/bin/env python3
"""
amazingData SDK 连接测试脚本
测试与银河证券星耀数智量化平台的连接
"""
import sys
import os
# 添加项目路径
sys.path.insert(0, '/app/working/workspaces/developer/projects/20260330_kline_system/backend')
from app.config import settings
from app.services.amazing_data_service import amazing_data_service
def test_connection():
"""测试 SDK 连接"""
print("=" * 60)
print("amazingData SDK 连接测试")
print("=" * 60)
print()
# 显示配置
print("📋 配置信息:")
print(f" Host: {settings.AMAZING_DATA_HOST}")
print(f" Port: {settings.AMAZING_DATA_PORT}")
print(f" Account: {settings.AMAZING_DATA_ACCOUNT}")
print(f" Environment: {settings.AMAZING_DATA_ENV}")
print()
# 测试连接
print("🔌 正在连接 amazingData 数据源...")
try:
success = amazing_data_service.connect()
if success:
print("✅ 连接成功!")
print()
# 测试获取数据
print("📊 测试获取数据...")
try:
# 获取期货代码列表
codes = amazing_data_service.get_security_codes(security_type="EXTRA_FUTURE")
if codes:
print(f"✅ 获取期货代码列表成功,共 {len(codes)} 个品种")
print(f" 示例:{codes[:5] if len(codes) >= 5 else codes}")
else:
print("⚠️ 代码列表为空")
except Exception as e:
print(f"❌ 获取数据失败:{e}")
# 断开连接
print()
print("🔌 正在断开连接...")
amazing_data_service.disconnect()
print("✅ 已断开连接")
return True
else:
print("❌ 连接失败")
return False
except Exception as e:
print(f"❌ 连接异常:{e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_connection()
print()
print("=" * 60)
if success:
print("🎉 测试通过amazingData SDK 集成成功!")
else:
print("⚠️ 测试失败,请检查网络和账号配置")
print("=" * 60)
sys.exit(0 if success else 1)