#!/usr/bin/env python3 """ amazingData SDK 数据获取测试 测试获取 600126 股票的日 K 线数据 """ import sys sys.path.insert(0, '/app/working/workspaces/developer/projects/20260330_kline_system/backend') from datetime import datetime, timedelta from app.services.amazing_data_service import amazing_data_service from app.services.amazing_data_adapter import SecurityType print("=" * 70) print("amazingData SDK 数据获取测试") print("=" * 70) print() # 测试参数 symbol = "600126" # 杭钢股份 security_type = "EXTRA_STOCK_A" # 沪深 A 股 period = "1d" # 日 K 线 end_date = datetime.now().strftime("%Y%m%d") start_date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d") print(f"📋 测试参数:") print(f" 股票代码:{symbol}") print(f" 证券类型:{security_type}") print(f" 周期:{period}") print(f" 开始日期:{start_date}") print(f" 结束日期:{end_date}") print() # 连接数据源 print("🔌 正在连接 amazingData 数据源...") success = amazing_data_service.connect() if not success: print("❌ 连接失败!") sys.exit(1) print("✅ 连接成功!") print() # 获取 K 线数据 print(f"📊 正在获取 {symbol} 的日 K 线数据...") try: kline_data = amazing_data_service.get_kline_data( symbol=symbol, period=period, start_date=start_date, end_date=end_date, security_type=security_type ) if kline_data: print(f"✅ 获取成功!共 {len(kline_data)} 条数据") print() # 显示前 5 条数据 print("📈 前 5 条 K 线数据:") print("-" * 70) print(f"{'日期':<12} {'开盘':<10} {'最高':<10} {'最低':<10} {'收盘':<10} {'成交量':<12}") print("-" * 70) for i, row in enumerate(kline_data[:5]): # 处理不同的字段名 date = row.get('date', row.get('datetime', row.get('time', ''))) open_price = row.get('open', row.get('open_price', 0)) high = row.get('high', row.get('high_price', 0)) low = row.get('low', row.get('low_price', 0)) close = row.get('close', row.get('close_price', 0)) volume = row.get('volume', row.get('vol', 0)) print(f"{str(date):<12} {open_price:<10.2f} {high:<10.2f} {low:<10.2f} {close:<10.2f} {volume:<12}") print("-" * 70) # 数据统计 if len(kline_data) > 0: print() print("📊 数据统计:") latest = kline_data[-1] if len(kline_data) > 0 else kline_data[0] print(f" 最新日期:{latest.get('date', latest.get('datetime', 'N/A'))}") print(f" 最新收盘价:{latest.get('close', latest.get('close_price', 'N/A'))}") print(f" 数据条数:{len(kline_data)}") else: print("⚠️ 未获取到数据") except Exception as e: print(f"❌ 获取数据失败:{e}") import traceback traceback.print_exc() print() print("🔌 正在断开连接...") amazing_data_service.disconnect() print("✅ 已断开连接") print() print("=" * 70) print("测试完成!") print("=" * 70)