You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
3.2 KiB

#!/usr/bin/env python3
"""
amazingData SDK 数据获取测试
测试获取 600126 股票的日 K 线数据
"""
import sys
sys.path.insert(0, '/app/working/workspaces/developer/projects/20260330_kline_system/backend')
from datetime import datetime, timedelta
from app.services.amazing_data_service import amazing_data_service
from app.services.amazing_data_adapter import SecurityType
print("=" * 70)
print("amazingData SDK 数据获取测试")
print("=" * 70)
print()
# 测试参数
symbol = "600126" # 杭钢股份
security_type = "EXTRA_STOCK_A" # 沪深 A 股
period = "1d" # 日 K 线
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d")
print(f"📋 测试参数:")
print(f" 股票代码:{symbol}")
print(f" 证券类型:{security_type}")
print(f" 周期:{period}")
print(f" 开始日期:{start_date}")
print(f" 结束日期:{end_date}")
print()
# 连接数据源
print("🔌 正在连接 amazingData 数据源...")
success = amazing_data_service.connect()
if not success:
print("❌ 连接失败!")
sys.exit(1)
print("✅ 连接成功!")
print()
# 获取 K 线数据
print(f"📊 正在获取 {symbol} 的日 K 线数据...")
try:
kline_data = amazing_data_service.get_kline_data(
symbol=symbol,
period=period,
start_date=start_date,
end_date=end_date,
security_type=security_type
)
if kline_data:
print(f"✅ 获取成功!共 {len(kline_data)} 条数据")
print()
# 显示前 5 条数据
print("📈 前 5 条 K 线数据:")
print("-" * 70)
print(f"{'日期':<12} {'开盘':<10} {'最高':<10} {'最低':<10} {'收盘':<10} {'成交量':<12}")
print("-" * 70)
for i, row in enumerate(kline_data[:5]):
# 处理不同的字段名
date = row.get('date', row.get('datetime', row.get('time', '')))
open_price = row.get('open', row.get('open_price', 0))
high = row.get('high', row.get('high_price', 0))
low = row.get('low', row.get('low_price', 0))
close = row.get('close', row.get('close_price', 0))
volume = row.get('volume', row.get('vol', 0))
print(f"{str(date):<12} {open_price:<10.2f} {high:<10.2f} {low:<10.2f} {close:<10.2f} {volume:<12}")
print("-" * 70)
# 数据统计
if len(kline_data) > 0:
print()
print("📊 数据统计:")
latest = kline_data[-1] if len(kline_data) > 0 else kline_data[0]
print(f" 最新日期:{latest.get('date', latest.get('datetime', 'N/A'))}")
print(f" 最新收盘价:{latest.get('close', latest.get('close_price', 'N/A'))}")
print(f" 数据条数:{len(kline_data)}")
else:
print("⚠️ 未获取到数据")
except Exception as e:
print(f"❌ 获取数据失败:{e}")
import traceback
traceback.print_exc()
print()
print("🔌 正在断开连接...")
amazing_data_service.disconnect()
print("✅ 已断开连接")
print()
print("=" * 70)
print("测试完成!")
print("=" * 70)