You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data_collector/test_amazing_data_adapter.py

749 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
AmazingData 数据源适配器 - 完整测试套件
每个接口都有独立的测试用例和入口
"""
import sys
import traceback
from datetime import datetime
from amazing_data_adapter import (
AmazingDataAdapter,
DataSourceConfig,
SecurityType,
Market,
Period,
create_adapter
)
# ============== 配置信息 ==============
USERNAME = '11200008169'
PASSWORD = '11200008169@2026'
HOST = '140.206.44.234'
PORT = 8600
LOCAL_PATH = './amazing_data_cache/'
def create_test_adapter() -> AmazingDataAdapter:
"""创建测试用的适配器实例"""
return create_adapter(
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
local_path=LOCAL_PATH,
use_local_cache=True
)
def run_test(test_name: str, test_func):
"""运行单个测试并输出结果"""
print(f"\n{'='*60}")
print(f"测试: {test_name}")
print('='*60)
adapter = create_test_adapter()
try:
if not adapter.connect():
print("❌ 连接失败")
return False
test_func(adapter)
print("✅ 测试通过")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
traceback.print_exc()
return False
finally:
adapter.disconnect()
# ============== 基础数据接口测试 ==============
def test_get_code_list(adapter: AmazingDataAdapter):
"""测试: 获取代码列表"""
print("\n--- 测试获取各类证券代码列表 ---")
# 沪深A股
stock_codes = adapter.get_code_list(SecurityType.STOCK_A)
print(f"沪深A股数量: {len(stock_codes)}")
if stock_codes:
print(f"示例代码: {stock_codes[:5]}")
# ETF
etf_codes = adapter.get_code_list(SecurityType.ETF)
print(f"ETF数量: {len(etf_codes)}")
if etf_codes:
print(f"示例代码: {etf_codes[:3]}")
# 期货
future_codes = adapter.get_code_list(SecurityType.FUTURE)
print(f"期货数量: {len(future_codes)}")
if future_codes:
print(f"示例代码: {future_codes[:3]}")
# 可转债
kzz_codes = adapter.get_code_list(SecurityType.KZZ)
print(f"可转债数量: {len(kzz_codes)}")
if kzz_codes:
print(f"示例代码: {kzz_codes[:3]}")
# 指数
index_codes = adapter.get_code_list(SecurityType.INDEX_A)
print(f"沪深指数数量: {len(index_codes)}")
if index_codes:
print(f"示例代码: {index_codes[:3]}")
def test_get_code_info(adapter: AmazingDataAdapter):
"""测试: 获取证券信息"""
print("\n--- 测试获取证券基本信息 ---")
stock_info = adapter.get_code_info(SecurityType.STOCK_A)
print(f"证券信息形状: {stock_info.shape}")
print(f"列名: {stock_info.columns.tolist()[:10]}")
print("\n前3条数据:")
print(stock_info.head(3))
def test_get_trading_calendar(adapter: AmazingDataAdapter):
"""测试: 获取交易日历"""
print("\n--- 测试获取交易日历 ---")
# 上海市场
sh_calendar = adapter.get_trading_calendar(Market.SH)
print(f"上海市场交易日数量: {len(sh_calendar)}")
print(f"最近5个交易日: {sh_calendar[-5:]}")
# 深圳市场
sz_calendar = adapter.get_trading_calendar(Market.SZ)
print(f"深圳市场交易日数量: {len(sz_calendar)}")
# 北京市场
bj_calendar = adapter.get_trading_calendar(Market.BJ)
print(f"北京市场交易日数量: {len(bj_calendar)}")
def test_get_adj_factor(adapter: AmazingDataAdapter):
"""测试: 获取复权因子"""
print("\n--- 测试获取复权因子 ---")
codes = ['000001.SZ', '600000.SH']
adj_factor = adapter.get_adj_factor(codes, is_local=False)
print(f"复权因子形状: {adj_factor.shape}")
print(f"列名: {adj_factor.columns.tolist()}")
print("\n前5条数据:")
print(adj_factor.head())
def test_get_backward_factor(adapter: AmazingDataAdapter):
"""测试: 获取后复权因子"""
print("\n--- 测试获取后复权因子 ---")
codes = ['000001.SZ', '600000.SH', '000858.SZ']
backward_factor = adapter.get_backward_factor(codes)
print(f"后复权因子形状: {backward_factor.shape}")
print(f"列名: {backward_factor.columns.tolist()}")
print("\n前5条数据:")
print(backward_factor.head())
# ============== 历史行情数据接口测试 ==============
def test_get_kline(adapter: AmazingDataAdapter):
"""测试: 获取历史K线数据"""
print("\n--- 测试获取历史K线数据 ---")
# 获取沪深A股代码列表
stock_codes = adapter.get_code_list(SecurityType.STOCK_A)
sample_codes = stock_codes[:3]
# 日K线
print("\n>>> 日K线数据")
kline_daily = adapter.get_kline(
codes=sample_codes,
start_date='20241201',
end_date='20241231',
period=Period.DAILY
)
for code, df in list(kline_daily.items())[:2]:
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
print(f" 列名: {df.columns.tolist()}")
if not df.empty:
print(f" 最新数据:\n{df.tail(2)}")
# 分钟K线
print("\n>>> 60分钟K线数据")
kline_min = adapter.get_kline(
codes=['000001.SZ'],
start_date='20241201',
end_date='20241231',
period=Period.MIN60
)
for code, df in kline_min.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 前2条数据:\n{df.head(2)}")
def test_get_snapshot(adapter: AmazingDataAdapter):
"""测试: 获取历史快照数据tick级别"""
print("\n--- 测试获取历史快照数据 ---")
snapshot_data = adapter.get_snapshot(
codes=['000001.SZ'],
start_date='20241201',
end_date='20241201' # 只取一天的数据,避免数据量过大
)
print(f"快照数据形状: {snapshot_data}")
for code, df in snapshot_data.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:15]}") # 显示前15列
print(f" 前3条数据:\n{df.head(3)}")
else:
print(" 数据为空")
# ============== 财务数据接口测试 ==============
def test_get_balance_sheet(adapter: AmazingDataAdapter):
"""测试: 获取资产负债表"""
print("\n--- 测试获取资产负债表 ---")
codes = ['000001.SZ', '600000.SH']
balance_sheet = adapter.get_balance_sheet(
codes=codes,
start_date=20250101,
end_date=20260311
)
for code, df in balance_sheet.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
# 获取最后一个资产负债表并打印最近日期的 TOT_SHARE
print("\n--- 最后一个资产负债表详情 ---")
last_code = list(balance_sheet.keys())[-1]
last_df = balance_sheet[last_code]
if not last_df.empty:
print(f"股票代码: {last_code}")
print(f"资产负债表:\n{last_df}")
if 'TOT_SHARE' in last_df.columns:
# 按 REPORTING_PERIOD 降序排序,获取最近日期的一行
if 'REPORTING_PERIOD' in last_df.columns:
# 输出日期和总股本对应关系
print("\n日期与总股本对应关系:")
for idx, row in last_df.iterrows():
print(f" {row['REPORTING_PERIOD']}: {row['TOT_SHARE']}")
# 获取最近日期的数据
latest_row = last_df.sort_values('REPORTING_PERIOD', ascending=False).iloc[0]
latest_date = latest_row['REPORTING_PERIOD']
else:
# 如果没有 REPORTING_PERIOD 列,取最后一行
print("\n日期与总股本对应关系:")
for idx, row in last_df.iterrows():
print(f" {idx}: {row['TOT_SHARE']}")
latest_row = last_df.iloc[-1]
latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知'
latest_tot_share = latest_row['TOT_SHARE']
print(f"\n>>> 最近报告期 {latest_date} 的总股本: {latest_tot_share}")
else:
print(f"\n可用列: {last_df.columns.tolist()}")
print("未找到 TOT_SHARE 列")
else:
print(f"{last_code} 的资产负债表为空")
def test_get_cash_flow(adapter: AmazingDataAdapter):
"""测试: 获取现金流量表"""
print("\n--- 测试获取现金流量表 ---")
codes = ['000001.SZ', '600000.SH']
cash_flow = adapter.get_cash_flow(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in cash_flow.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
def test_get_income_statement(adapter: AmazingDataAdapter):
"""测试: 获取利润表"""
print("\n--- 测试获取利润表 ---")
codes = ['000001.SZ', '600000.SH']
income = adapter.get_income_statement(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in income.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
def test_get_profit_express(adapter: AmazingDataAdapter):
"""测试: 获取业绩快报"""
print("\n--- 测试获取业绩快报 ---")
codes = ['000001.SZ', '600000.SH', '000858.SZ']
profit_express = adapter.get_profit_express(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {profit_express.shape}")
if not profit_express.empty:
print(f"列名: {profit_express.columns.tolist()}")
print(f"\n前5条数据:\n{profit_express.head()}")
else:
print("数据为空")
def test_get_profit_notice(adapter: AmazingDataAdapter):
"""测试: 获取业绩预告"""
print("\n--- 测试获取业绩预告 ---")
codes = ['000001.SZ', '600000.SH']
profit_notice = adapter.get_profit_notice(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {profit_notice.shape}")
if not profit_notice.empty:
print(f"列名: {profit_notice.columns.tolist()}")
print(f"\n前5条数据:\n{profit_notice.head()}")
else:
print("数据为空")
# ============== 股东股本数据接口测试 ==============
def test_get_top10_shareholders(adapter: AmazingDataAdapter):
"""测试: 获取十大股东数据"""
print("\n--- 测试获取十大股东数据 ---")
codes = ['000001.SZ', '600000.SH']
top10 = adapter.get_top10_shareholders(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {top10.shape}")
if not top10.empty:
print(f"列名: {top10.columns.tolist()}")
print(f"\n前5条数据:\n{top10.head()}")
else:
print("数据为空")
def test_get_shareholder_count(adapter: AmazingDataAdapter):
"""测试: 获取股东户数数据"""
print("\n--- 测试获取股东户数数据 ---")
codes = ['000001.SZ', '600000.SH']
holder_count = adapter.get_shareholder_count(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {holder_count.shape}")
if not holder_count.empty:
print(f"列名: {holder_count.columns.tolist()}")
print(f"\n前5条数据:\n{holder_count.head()}")
else:
print("数据为空")
def test_get_equity_structure(adapter: AmazingDataAdapter):
"""测试: 获取股本结构数据"""
print("\n--- 测试获取股本结构数据 ---")
codes = ['600000.SH']
equity = adapter.get_equity_structure(
codes=codes,
start_date=20250101,
end_date=20251231
)
print(f"数据形状: {equity.shape}")
if not equity.empty:
print(f"列名: {equity.columns.tolist()}")
print(f"\n前5条数据:\n{equity.head()}")
# 按日期和 TOT_A_SHARE 输出
if 'TOT_A_SHARE' in equity.columns:
print("\n日期与流通A股(TOT_A_SHARE)对应关系:")
if 'REPORTING_PERIOD' in equity.columns:
for idx, row in equity.iterrows():
print(f" {row['REPORTING_PERIOD']}: {row['TOT_A_SHARE']}")
# 获取最近日期的数据
latest_row = equity.sort_values('REPORTING_PERIOD', ascending=False).iloc[0]
latest_date = latest_row['REPORTING_PERIOD']
else:
for idx, row in equity.iterrows():
print(f" {idx}: {row['TOT_A_SHARE']}")
latest_row = equity.iloc[-1]
latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知'
print(f"\n>>> 最近报告期 {latest_date} 的流通A股: {latest_row['TOT_A_SHARE']}")
else:
print(f"\n可用列: {equity.columns.tolist()}")
print("未找到 TOT_A_SHARE 列")
else:
print("数据为空")
# ============== 融资融券数据接口测试 ==============
def test_get_margin_summary(adapter: AmazingDataAdapter):
"""测试: 获取融资融券成交汇总"""
print("\n--- 测试获取融资融券成交汇总 ---")
margin_summary = adapter.get_margin_summary(
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {margin_summary.shape}")
if not margin_summary.empty:
print(f"列名: {margin_summary.columns.tolist()}")
print(f"\n前5条数据:\n{margin_summary.head()}")
else:
print("数据为空")
def test_get_margin_detail(adapter: AmazingDataAdapter):
"""测试: 获取融资融券交易明细"""
print("\n--- 测试获取融资融券交易明细 ---")
codes = ['000001.SZ', '600000.SH']
margin_detail = adapter.get_margin_detail(
codes=codes,
start_date=20241201,
end_date=20241231
)
for code, df in margin_detail.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
# ============== 交易异动数据接口测试 ==============
def test_get_longhu_bang(adapter: AmazingDataAdapter):
"""测试: 获取龙虎榜数据"""
print("\n--- 测试获取龙虎榜数据 ---")
codes = ['000001.SZ', '600000.SH']
longhu = adapter.get_longhu_bang(
codes=codes,
start_date=20241201,
end_date=20241231
)
print(f"数据形状: {longhu.shape}")
if not longhu.empty:
print(f"列名: {longhu.columns.tolist()}")
print(f"\n前5条数据:\n{longhu.head()}")
else:
print("数据为空")
def test_get_block_trading(adapter: AmazingDataAdapter):
"""测试: 获取大宗交易数据"""
print("\n--- 测试获取大宗交易数据 ---")
codes = ['000001.SZ', '600000.SH']
block_trade = adapter.get_block_trading(
codes=codes,
start_date=20241201,
end_date=20241231
)
print(f"数据形状: {block_trade.shape}")
if not block_trade.empty:
print(f"列名: {block_trade.columns.tolist()}")
print(f"\n前5条数据:\n{block_trade.head()}")
else:
print("数据为空")
# ============== 指数数据接口测试 ==============
def test_get_index_constituents(adapter: AmazingDataAdapter):
"""测试: 获取指数成分股"""
print("\n--- 测试获取指数成分股 ---")
index_codes = ['000300.SH', '000905.SH', '000016.SH']
constituents = adapter.get_index_constituents(index_codes)
for code, df in constituents.items():
print(f"\n{code}:")
print(f" 成分股数量: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 前5只成分股:\n{df.head()}")
def test_get_index_weights(adapter: AmazingDataAdapter):
"""测试: 获取指数成分股权重"""
print("\n--- 测试获取指数成分股权重 ---")
index_codes = ['000300.SH', '000905.SH']
weights = adapter.get_index_weights(
codes=index_codes,
start_date=20241201,
end_date=20241231
)
for code, df in weights.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 权重最大的5只股票:\n{df.nlargest(5, 'WEIGHT') if 'WEIGHT' in df.columns else df.head()}")
# ============== ETF数据接口测试 ==============
def test_get_etf_pcf(adapter: AmazingDataAdapter):
"""测试: 获取ETF申赎数据"""
print("\n--- 测试获取ETF申赎数据 ---")
etf_codes = ['510050.SH', '510300.SH']
etf_info, etf_constituents = adapter.get_etf_pcf(etf_codes)
print(f"ETF基本信息形状: {etf_info.shape}")
if not etf_info.empty:
print(f"列名: {etf_info.columns.tolist()}")
print(f"\nETF基本信息:\n{etf_info.head()}")
print(f"\nETF成分股数据数量: {len(etf_constituents)}")
for code, df in list(etf_constituents.items())[:2]:
print(f"\n{code} 成分股:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 前3条:\n{df.head(3)}")
def test_get_fund_share(adapter: AmazingDataAdapter):
"""测试: 获取基金份额数据"""
print("\n--- 测试获取基金份额数据 ---")
codes = ['510050.SH', '510300.SH']
fund_share = adapter.get_fund_share(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in fund_share.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 数据预览:\n{df.head()}")
# ============== 可转债数据接口测试 ==============
def test_get_kzz_issuance(adapter: AmazingDataAdapter):
"""测试: 获取可转债发行数据"""
print("\n--- 测试获取可转债发行数据 ---")
kzz_codes = adapter.get_code_list(SecurityType.KZZ)
print(f"可转债数量: {len(kzz_codes)}")
if kzz_codes:
sample_codes = kzz_codes[:5]
print(f"测试代码: {sample_codes}")
kzz_issuance = adapter.get_kzz_issuance(sample_codes)
for code, df in list(kzz_issuance.items())[:3]:
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head()}")
# ============== 测试入口 ==============
# 所有测试用例映射
TEST_CASES = {
# 基础数据接口 (5个)
'code_list': test_get_code_list,
'code_info': test_get_code_info,
'trading_calendar': test_get_trading_calendar,
'adj_factor': test_get_adj_factor,
'backward_factor': test_get_backward_factor,
# 历史行情数据接口 (2个)
'kline': test_get_kline,
'snapshot': test_get_snapshot,
# 财务数据接口 (5个)
'balance_sheet': test_get_balance_sheet,
'cash_flow': test_get_cash_flow,
'income_statement': test_get_income_statement,
'profit_express': test_get_profit_express,
'profit_notice': test_get_profit_notice,
# 股东股本数据接口 (3个)
'top10_shareholders': test_get_top10_shareholders,
'shareholder_count': test_get_shareholder_count,
'equity_structure': test_get_equity_structure,
# 融资融券数据接口 (2个)
'margin_summary': test_get_margin_summary,
'margin_detail': test_get_margin_detail,
# 交易异动数据接口 (2个)
'longhu_bang': test_get_longhu_bang,
'block_trading': test_get_block_trading,
# 指数数据接口 (2个)
'index_constituents': test_get_index_constituents,
'index_weights': test_get_index_weights,
# ETF数据接口 (2个)
'etf_pcf': test_get_etf_pcf,
'fund_share': test_get_fund_share,
# 可转债数据接口 (1个)
'kzz_issuance': test_get_kzz_issuance,
}
def print_usage():
"""打印使用说明"""
print("""
使用方法:
python test_amazing_data_adapter.py <test_name>
测试用例列表:
基础数据接口:
code_list - 获取代码列表
code_info - 获取证券信息
trading_calendar - 获取交易日历
adj_factor - 获取复权因子
backward_factor - 获取后复权因子
历史行情数据接口:
kline - 获取历史K线数据
snapshot - 获取历史快照数据
财务数据接口:
balance_sheet - 获取资产负债表
cash_flow - 获取现金流量表
income_statement - 获取利润表
profit_express - 获取业绩快报
profit_notice - 获取业绩预告
股东股本数据接口:
top10_shareholders - 获取十大股东数据
shareholder_count - 获取股东户数数据
equity_structure - 获取股本结构数据
融资融券数据接口:
margin_summary - 获取融资融券成交汇总
margin_detail - 获取融资融券交易明细
交易异动数据接口:
longhu_bang - 获取龙虎榜数据
block_trading - 获取大宗交易数据
指数数据接口:
index_constituents - 获取指数成分股
index_weights - 获取指数成分股权重
ETF数据接口:
etf_pcf - 获取ETF申赎数据
fund_share - 获取基金份额数据
可转债数据接口:
kzz_issuance - 获取可转债发行数据
特殊命令:
all - 运行所有测试
list - 列出所有测试用例
""")
def run_all_tests():
"""运行所有测试"""
print("\n" + "="*60)
print(f"运行所有测试用例: {list(TEST_CASES.keys())}")
print("="*60)
results = {}
for name, func in TEST_CASES.items():
print(f"\n--- 测试: {name} ---")
results[name] = run_test(name, func)
print(f"\n--- 测试: {name} 完成 ---")
# 打印汇总
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
passed = sum(results.values())
total = len(results)
for name, result in results.items():
status = "✅ 通过" if result else "❌ 失败"
print(f"{name:20s} {status}")
print("-"*60)
print(f"总计: {passed}/{total} 通过")
if __name__ == "__main__":
if len(sys.argv) < 2:
print_usage()
sys.exit(1)
test_name = sys.argv[1].lower()
if test_name == 'list':
print_usage()
elif test_name == 'all':
run_all_tests()
elif test_name in TEST_CASES:
run_test(test_name, TEST_CASES[test_name])
else:
print(f"未知测试用例: {test_name}")
print_usage()
sys.exit(1)