"""测试扩展字段功能 测试内容包括: 1. 获取日K线数据(包含新字段) 2. 验证新字段数据是否正确 使用方法: python scripts/test_extended_fields.py """ import os import sys import asyncio # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from datetime import datetime, timedelta from app.adapters.amazingdata_adapter import AmazingDataAdapter from app.core.logger import info, error async def test_fetch_daily_klines(): """测试获取日K线数据(包含扩展字段)""" adapter = AmazingDataAdapter() await adapter.connect({ "username": os.getenv("AMAZINGDATA_USERNAME", ""), "password": os.getenv("AMAZINGDATA_PASSWORD", ""), "host": os.getenv("AMAZINGDATA_HOST", ""), "port": int(os.getenv("AMAZINGDATA_PORT", "8600")), "local_path": "./amazing_data_cache/", "use_local_cache": True }) # 测试股票: 平安银行 000001.SZ symbol = "000001.SZ" # 获取最近5个交易日的数据 end_date = datetime.now() start_date = end_date - timedelta(days=30) # 获取最近30天的数据 start_str = start_date.strftime("%Y%m%d") end_str = end_date.strftime("%Y%m%d") info(f"\n测试获取 {symbol} 的日K线数据 ({start_str} - {end_str})") try: klines = await adapter.fetch_klines( symbol=symbol, start=start_str, end=end_str, freq="1d" ) if not klines: error(f"未获取到 {symbol} 的K线数据") return False info(f"成功获取 {len(klines)} 条K线数据\n") # 打印第一条数据的详细信息 first_kline = klines[0] info("第一条K线数据:") info(f" 日期: {datetime.fromtimestamp(first_kline.time).strftime('%Y-%m-%d')}") info(f" 开盘价: {first_kline.open}") info(f" 最高价: {first_kline.high}") info(f" 最低价: {first_kline.low}") info(f" 收盘价: {first_kline.close}") info(f" 成交量: {first_kline.volume}") info(f" 成交额: {first_kline.amount}") info("") info("扩展字段:") info(f" 交易日: {first_kline.trade_date}") return True except Exception as e: error(f"测试失败: {e}") import traceback error(traceback.format_exc()) return False finally: await adapter.close() async def test_save_and_read(): """测试保存和读取包含扩展字段的K线数据""" from sqlalchemy.orm import Session from app.repositories.database import SessionLocal from app.repositories.stock_repository import StockRepository from app.models.types import KLineItem, Frequency from datetime import datetime info("\n\n测试保存和读取包含扩展字段的K线数据...") db = SessionLocal() try: repo = StockRepository(db) # 创建测试数据 test_items = [ KLineItem( time=datetime(2024, 1, 15, 0, 0, 0), open=10.0, high=11.0, low=9.5, close=10.8, volume=100000, amount=1080000.0, trade_date="2024-01-15", is_limit_up=True, is_limit_down=False, total_market_cap=1500000000.0, float_market_cap=1200000000.0, inst_holding_ratio=25.5, trading_days=5000 ) ] # 使用 object.__setattr__ 绕过 Pydantic 验证添加 symbol 属性 for item in test_items: object.__setattr__(item, 'symbol', "000001.SZ") # 保存数据 repo.save_klines(Frequency.FREQ_1D, test_items) info("测试数据保存成功") # 读取数据 items = repo.get_klines( symbol="000001.SZ", freq=Frequency.FREQ_1D, start=datetime(2024, 1, 1), end=datetime(2024, 1, 31) ) if items: item = items[0] info(f"\n从数据库读取的数据:") info(f" 日期: {item.time}") info(f" 收盘价: {item.close}") info(f" 交易日: {item.trade_date}") info(f" 是否涨停: {item.is_limit_up}") info(f" 是否跌停: {item.is_limit_down}") info(f" 总市值: {item.total_market_cap}") info(f" 流通市值: {item.float_market_cap}") info(f" 机构持仓占比: {item.inst_holding_ratio}") info(f" 可交易日数: {item.trading_days}") return True else: error("未读取到数据") return False except Exception as e: error(f"测试失败: {e}") import traceback error(traceback.format_exc()) return False finally: db.close() async def main(): """主函数""" info("=" * 60) info("开始测试扩展字段功能") info("=" * 60) # 测试1: 获取日K线数据 success1 = await test_fetch_daily_klines() # 测试2: 保存和读取数据 success2 = await test_save_and_read() info("\n" + "=" * 60) if success1 and success2: info("所有测试通过!") else: error("部分测试失败") info("=" * 60) if __name__ == "__main__": asyncio.run(main())