You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data_collector/test_amazing_data_adapter.py

749 lines
24 KiB

"""
AmazingData 数据源适配器 - 完整测试套件
每个接口都有独立的测试用例和入口
"""
import sys
import traceback
from datetime import datetime
from amazing_data_adapter import (
AmazingDataAdapter,
DataSourceConfig,
SecurityType,
Market,
Period,
create_adapter
)
# ============== 配置信息 ==============
USERNAME = '11200008169'
PASSWORD = '11200008169@2026'
HOST = '140.206.44.234'
PORT = 8600
LOCAL_PATH = './amazing_data_cache/'
def create_test_adapter() -> AmazingDataAdapter:
"""创建测试用的适配器实例"""
return create_adapter(
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
local_path=LOCAL_PATH,
use_local_cache=True
)
def run_test(test_name: str, test_func):
"""运行单个测试并输出结果"""
print(f"\n{'='*60}")
print(f"测试: {test_name}")
print('='*60)
adapter = create_test_adapter()
try:
if not adapter.connect():
print("❌ 连接失败")
return False
test_func(adapter)
print("✅ 测试通过")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
traceback.print_exc()
return False
finally:
adapter.disconnect()
# ============== 基础数据接口测试 ==============
def test_get_code_list(adapter: AmazingDataAdapter):
"""测试: 获取代码列表"""
print("\n--- 测试获取各类证券代码列表 ---")
# 沪深A股
stock_codes = adapter.get_code_list(SecurityType.STOCK_A)
print(f"沪深A股数量: {len(stock_codes)}")
if stock_codes:
print(f"示例代码: {stock_codes[:5]}")
# ETF
etf_codes = adapter.get_code_list(SecurityType.ETF)
print(f"ETF数量: {len(etf_codes)}")
if etf_codes:
print(f"示例代码: {etf_codes[:3]}")
# 期货
future_codes = adapter.get_code_list(SecurityType.FUTURE)
print(f"期货数量: {len(future_codes)}")
if future_codes:
print(f"示例代码: {future_codes[:3]}")
# 可转债
kzz_codes = adapter.get_code_list(SecurityType.KZZ)
print(f"可转债数量: {len(kzz_codes)}")
if kzz_codes:
print(f"示例代码: {kzz_codes[:3]}")
# 指数
index_codes = adapter.get_code_list(SecurityType.INDEX_A)
print(f"沪深指数数量: {len(index_codes)}")
if index_codes:
print(f"示例代码: {index_codes[:3]}")
def test_get_code_info(adapter: AmazingDataAdapter):
"""测试: 获取证券信息"""
print("\n--- 测试获取证券基本信息 ---")
stock_info = adapter.get_code_info(SecurityType.STOCK_A)
print(f"证券信息形状: {stock_info.shape}")
print(f"列名: {stock_info.columns.tolist()[:10]}")
print("\n前3条数据:")
print(stock_info.head(3))
def test_get_trading_calendar(adapter: AmazingDataAdapter):
"""测试: 获取交易日历"""
print("\n--- 测试获取交易日历 ---")
# 上海市场
sh_calendar = adapter.get_trading_calendar(Market.SH)
print(f"上海市场交易日数量: {len(sh_calendar)}")
print(f"最近5个交易日: {sh_calendar[-5:]}")
# 深圳市场
sz_calendar = adapter.get_trading_calendar(Market.SZ)
print(f"深圳市场交易日数量: {len(sz_calendar)}")
# 北京市场
bj_calendar = adapter.get_trading_calendar(Market.BJ)
print(f"北京市场交易日数量: {len(bj_calendar)}")
def test_get_adj_factor(adapter: AmazingDataAdapter):
"""测试: 获取复权因子"""
print("\n--- 测试获取复权因子 ---")
codes = ['000001.SZ', '600000.SH']
adj_factor = adapter.get_adj_factor(codes, is_local=False)
print(f"复权因子形状: {adj_factor.shape}")
print(f"列名: {adj_factor.columns.tolist()}")
print("\n前5条数据:")
print(adj_factor.head())
def test_get_backward_factor(adapter: AmazingDataAdapter):
"""测试: 获取后复权因子"""
print("\n--- 测试获取后复权因子 ---")
codes = ['000001.SZ', '600000.SH', '000858.SZ']
backward_factor = adapter.get_backward_factor(codes)
print(f"后复权因子形状: {backward_factor.shape}")
print(f"列名: {backward_factor.columns.tolist()}")
print("\n前5条数据:")
print(backward_factor.head())
# ============== 历史行情数据接口测试 ==============
def test_get_kline(adapter: AmazingDataAdapter):
"""测试: 获取历史K线数据"""
print("\n--- 测试获取历史K线数据 ---")
# 获取沪深A股代码列表
stock_codes = adapter.get_code_list(SecurityType.STOCK_A)
sample_codes = stock_codes[:3]
# 日K线
print("\n>>> 日K线数据")
kline_daily = adapter.get_kline(
codes=sample_codes,
start_date='20241201',
end_date='20241231',
period=Period.DAILY
)
for code, df in list(kline_daily.items())[:2]:
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
print(f" 列名: {df.columns.tolist()}")
if not df.empty:
print(f" 最新数据:\n{df.tail(2)}")
# 分钟K线
print("\n>>> 60分钟K线数据")
kline_min = adapter.get_kline(
codes=['000001.SZ'],
start_date='20241201',
end_date='20241231',
period=Period.MIN60
)
for code, df in kline_min.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 前2条数据:\n{df.head(2)}")
def test_get_snapshot(adapter: AmazingDataAdapter):
"""测试: 获取历史快照数据tick级别"""
print("\n--- 测试获取历史快照数据 ---")
snapshot_data = adapter.get_snapshot(
codes=['000001.SZ'],
start_date='20241201',
end_date='20241201' # 只取一天的数据,避免数据量过大
)
print(f"快照数据形状: {snapshot_data}")
for code, df in snapshot_data.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:15]}") # 显示前15列
print(f" 前3条数据:\n{df.head(3)}")
else:
print(" 数据为空")
# ============== 财务数据接口测试 ==============
def test_get_balance_sheet(adapter: AmazingDataAdapter):
"""测试: 获取资产负债表"""
print("\n--- 测试获取资产负债表 ---")
codes = ['000001.SZ', '600000.SH']
balance_sheet = adapter.get_balance_sheet(
codes=codes,
start_date=20250101,
end_date=20260311
)
for code, df in balance_sheet.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
# 获取最后一个资产负债表并打印最近日期的 TOT_SHARE
print("\n--- 最后一个资产负债表详情 ---")
last_code = list(balance_sheet.keys())[-1]
last_df = balance_sheet[last_code]
if not last_df.empty:
print(f"股票代码: {last_code}")
print(f"资产负债表:\n{last_df}")
if 'TOT_SHARE' in last_df.columns:
# 按 REPORTING_PERIOD 降序排序,获取最近日期的一行
if 'REPORTING_PERIOD' in last_df.columns:
# 输出日期和总股本对应关系
print("\n日期与总股本对应关系:")
for idx, row in last_df.iterrows():
print(f" {row['REPORTING_PERIOD']}: {row['TOT_SHARE']}")
# 获取最近日期的数据
latest_row = last_df.sort_values('REPORTING_PERIOD', ascending=False).iloc[0]
latest_date = latest_row['REPORTING_PERIOD']
else:
# 如果没有 REPORTING_PERIOD 列,取最后一行
print("\n日期与总股本对应关系:")
for idx, row in last_df.iterrows():
print(f" {idx}: {row['TOT_SHARE']}")
latest_row = last_df.iloc[-1]
latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知'
latest_tot_share = latest_row['TOT_SHARE']
print(f"\n>>> 最近报告期 {latest_date} 的总股本: {latest_tot_share}")
else:
print(f"\n可用列: {last_df.columns.tolist()}")
print("未找到 TOT_SHARE 列")
else:
print(f"{last_code} 的资产负债表为空")
def test_get_cash_flow(adapter: AmazingDataAdapter):
"""测试: 获取现金流量表"""
print("\n--- 测试获取现金流量表 ---")
codes = ['000001.SZ', '600000.SH']
cash_flow = adapter.get_cash_flow(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in cash_flow.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
def test_get_income_statement(adapter: AmazingDataAdapter):
"""测试: 获取利润表"""
print("\n--- 测试获取利润表 ---")
codes = ['000001.SZ', '600000.SH']
income = adapter.get_income_statement(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in income.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
def test_get_profit_express(adapter: AmazingDataAdapter):
"""测试: 获取业绩快报"""
print("\n--- 测试获取业绩快报 ---")
codes = ['000001.SZ', '600000.SH', '000858.SZ']
profit_express = adapter.get_profit_express(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {profit_express.shape}")
if not profit_express.empty:
print(f"列名: {profit_express.columns.tolist()}")
print(f"\n前5条数据:\n{profit_express.head()}")
else:
print("数据为空")
def test_get_profit_notice(adapter: AmazingDataAdapter):
"""测试: 获取业绩预告"""
print("\n--- 测试获取业绩预告 ---")
codes = ['000001.SZ', '600000.SH']
profit_notice = adapter.get_profit_notice(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {profit_notice.shape}")
if not profit_notice.empty:
print(f"列名: {profit_notice.columns.tolist()}")
print(f"\n前5条数据:\n{profit_notice.head()}")
else:
print("数据为空")
# ============== 股东股本数据接口测试 ==============
def test_get_top10_shareholders(adapter: AmazingDataAdapter):
"""测试: 获取十大股东数据"""
print("\n--- 测试获取十大股东数据 ---")
codes = ['000001.SZ', '600000.SH']
top10 = adapter.get_top10_shareholders(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {top10.shape}")
if not top10.empty:
print(f"列名: {top10.columns.tolist()}")
print(f"\n前5条数据:\n{top10.head()}")
else:
print("数据为空")
def test_get_shareholder_count(adapter: AmazingDataAdapter):
"""测试: 获取股东户数数据"""
print("\n--- 测试获取股东户数数据 ---")
codes = ['000001.SZ', '600000.SH']
holder_count = adapter.get_shareholder_count(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {holder_count.shape}")
if not holder_count.empty:
print(f"列名: {holder_count.columns.tolist()}")
print(f"\n前5条数据:\n{holder_count.head()}")
else:
print("数据为空")
def test_get_equity_structure(adapter: AmazingDataAdapter):
"""测试: 获取股本结构数据"""
print("\n--- 测试获取股本结构数据 ---")
codes = ['600000.SH']
equity = adapter.get_equity_structure(
codes=codes,
start_date=20250101,
end_date=20251231
)
print(f"数据形状: {equity.shape}")
if not equity.empty:
print(f"列名: {equity.columns.tolist()}")
print(f"\n前5条数据:\n{equity.head()}")
# 按日期和 TOT_A_SHARE 输出
if 'TOT_A_SHARE' in equity.columns:
print("\n日期与流通A股(TOT_A_SHARE)对应关系:")
if 'REPORTING_PERIOD' in equity.columns:
for idx, row in equity.iterrows():
print(f" {row['REPORTING_PERIOD']}: {row['TOT_A_SHARE']}")
# 获取最近日期的数据
latest_row = equity.sort_values('REPORTING_PERIOD', ascending=False).iloc[0]
latest_date = latest_row['REPORTING_PERIOD']
else:
for idx, row in equity.iterrows():
print(f" {idx}: {row['TOT_A_SHARE']}")
latest_row = equity.iloc[-1]
latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知'
print(f"\n>>> 最近报告期 {latest_date} 的流通A股: {latest_row['TOT_A_SHARE']}")
else:
print(f"\n可用列: {equity.columns.tolist()}")
print("未找到 TOT_A_SHARE 列")
else:
print("数据为空")
# ============== 融资融券数据接口测试 ==============
def test_get_margin_summary(adapter: AmazingDataAdapter):
"""测试: 获取融资融券成交汇总"""
print("\n--- 测试获取融资融券成交汇总 ---")
margin_summary = adapter.get_margin_summary(
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {margin_summary.shape}")
if not margin_summary.empty:
print(f"列名: {margin_summary.columns.tolist()}")
print(f"\n前5条数据:\n{margin_summary.head()}")
else:
print("数据为空")
def test_get_margin_detail(adapter: AmazingDataAdapter):
"""测试: 获取融资融券交易明细"""
print("\n--- 测试获取融资融券交易明细 ---")
codes = ['000001.SZ', '600000.SH']
margin_detail = adapter.get_margin_detail(
codes=codes,
start_date=20241201,
end_date=20241231
)
for code, df in margin_detail.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
# ============== 交易异动数据接口测试 ==============
def test_get_longhu_bang(adapter: AmazingDataAdapter):
"""测试: 获取龙虎榜数据"""
print("\n--- 测试获取龙虎榜数据 ---")
codes = ['000001.SZ', '600000.SH']
longhu = adapter.get_longhu_bang(
codes=codes,
start_date=20241201,
end_date=20241231
)
print(f"数据形状: {longhu.shape}")
if not longhu.empty:
print(f"列名: {longhu.columns.tolist()}")
print(f"\n前5条数据:\n{longhu.head()}")
else:
print("数据为空")
def test_get_block_trading(adapter: AmazingDataAdapter):
"""测试: 获取大宗交易数据"""
print("\n--- 测试获取大宗交易数据 ---")
codes = ['000001.SZ', '600000.SH']
block_trade = adapter.get_block_trading(
codes=codes,
start_date=20241201,
end_date=20241231
)
print(f"数据形状: {block_trade.shape}")
if not block_trade.empty:
print(f"列名: {block_trade.columns.tolist()}")
print(f"\n前5条数据:\n{block_trade.head()}")
else:
print("数据为空")
# ============== 指数数据接口测试 ==============
def test_get_index_constituents(adapter: AmazingDataAdapter):
"""测试: 获取指数成分股"""
print("\n--- 测试获取指数成分股 ---")
index_codes = ['000300.SH', '000905.SH', '000016.SH']
constituents = adapter.get_index_constituents(index_codes)
for code, df in constituents.items():
print(f"\n{code}:")
print(f" 成分股数量: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 前5只成分股:\n{df.head()}")
def test_get_index_weights(adapter: AmazingDataAdapter):
"""测试: 获取指数成分股权重"""
print("\n--- 测试获取指数成分股权重 ---")
index_codes = ['000300.SH', '000905.SH']
weights = adapter.get_index_weights(
codes=index_codes,
start_date=20241201,
end_date=20241231
)
for code, df in weights.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 权重最大的5只股票:\n{df.nlargest(5, 'WEIGHT') if 'WEIGHT' in df.columns else df.head()}")
# ============== ETF数据接口测试 ==============
def test_get_etf_pcf(adapter: AmazingDataAdapter):
"""测试: 获取ETF申赎数据"""
print("\n--- 测试获取ETF申赎数据 ---")
etf_codes = ['510050.SH', '510300.SH']
etf_info, etf_constituents = adapter.get_etf_pcf(etf_codes)
print(f"ETF基本信息形状: {etf_info.shape}")
if not etf_info.empty:
print(f"列名: {etf_info.columns.tolist()}")
print(f"\nETF基本信息:\n{etf_info.head()}")
print(f"\nETF成分股数据数量: {len(etf_constituents)}")
for code, df in list(etf_constituents.items())[:2]:
print(f"\n{code} 成分股:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 前3条:\n{df.head(3)}")
def test_get_fund_share(adapter: AmazingDataAdapter):
"""测试: 获取基金份额数据"""
print("\n--- 测试获取基金份额数据 ---")
codes = ['510050.SH', '510300.SH']
fund_share = adapter.get_fund_share(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in fund_share.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 数据预览:\n{df.head()}")
# ============== 可转债数据接口测试 ==============
def test_get_kzz_issuance(adapter: AmazingDataAdapter):
"""测试: 获取可转债发行数据"""
print("\n--- 测试获取可转债发行数据 ---")
kzz_codes = adapter.get_code_list(SecurityType.KZZ)
print(f"可转债数量: {len(kzz_codes)}")
if kzz_codes:
sample_codes = kzz_codes[:5]
print(f"测试代码: {sample_codes}")
kzz_issuance = adapter.get_kzz_issuance(sample_codes)
for code, df in list(kzz_issuance.items())[:3]:
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head()}")
# ============== 测试入口 ==============
# 所有测试用例映射
TEST_CASES = {
# 基础数据接口 (5个)
'code_list': test_get_code_list,
'code_info': test_get_code_info,
'trading_calendar': test_get_trading_calendar,
'adj_factor': test_get_adj_factor,
'backward_factor': test_get_backward_factor,
# 历史行情数据接口 (2个)
'kline': test_get_kline,
'snapshot': test_get_snapshot,
# 财务数据接口 (5个)
'balance_sheet': test_get_balance_sheet,
'cash_flow': test_get_cash_flow,
'income_statement': test_get_income_statement,
'profit_express': test_get_profit_express,
'profit_notice': test_get_profit_notice,
# 股东股本数据接口 (3个)
'top10_shareholders': test_get_top10_shareholders,
'shareholder_count': test_get_shareholder_count,
'equity_structure': test_get_equity_structure,
# 融资融券数据接口 (2个)
'margin_summary': test_get_margin_summary,
'margin_detail': test_get_margin_detail,
# 交易异动数据接口 (2个)
'longhu_bang': test_get_longhu_bang,
'block_trading': test_get_block_trading,
# 指数数据接口 (2个)
'index_constituents': test_get_index_constituents,
'index_weights': test_get_index_weights,
# ETF数据接口 (2个)
'etf_pcf': test_get_etf_pcf,
'fund_share': test_get_fund_share,
# 可转债数据接口 (1个)
'kzz_issuance': test_get_kzz_issuance,
}
def print_usage():
"""打印使用说明"""
print("""
使用方法:
python test_amazing_data_adapter.py <test_name>
测试用例列表:
基础数据接口:
code_list - 获取代码列表
code_info - 获取证券信息
trading_calendar - 获取交易日历
adj_factor - 获取复权因子
backward_factor - 获取后复权因子
历史行情数据接口:
kline - 获取历史K线数据
snapshot - 获取历史快照数据
财务数据接口:
balance_sheet - 获取资产负债表
cash_flow - 获取现金流量表
income_statement - 获取利润表
profit_express - 获取业绩快报
profit_notice - 获取业绩预告
股东股本数据接口:
top10_shareholders - 获取十大股东数据
shareholder_count - 获取股东户数数据
equity_structure - 获取股本结构数据
融资融券数据接口:
margin_summary - 获取融资融券成交汇总
margin_detail - 获取融资融券交易明细
交易异动数据接口:
longhu_bang - 获取龙虎榜数据
block_trading - 获取大宗交易数据
指数数据接口:
index_constituents - 获取指数成分股
index_weights - 获取指数成分股权重
ETF数据接口:
etf_pcf - 获取ETF申赎数据
fund_share - 获取基金份额数据
可转债数据接口:
kzz_issuance - 获取可转债发行数据
特殊命令:
all - 运行所有测试
list - 列出所有测试用例
""")
def run_all_tests():
"""运行所有测试"""
print("\n" + "="*60)
print(f"运行所有测试用例: {list(TEST_CASES.keys())}")
print("="*60)
results = {}
for name, func in TEST_CASES.items():
print(f"\n--- 测试: {name} ---")
results[name] = run_test(name, func)
print(f"\n--- 测试: {name} 完成 ---")
# 打印汇总
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
passed = sum(results.values())
total = len(results)
for name, result in results.items():
status = "✅ 通过" if result else "❌ 失败"
print(f"{name:20s} {status}")
print("-"*60)
print(f"总计: {passed}/{total} 通过")
if __name__ == "__main__":
if len(sys.argv) < 2:
print_usage()
sys.exit(1)
test_name = sys.argv[1].lower()
if test_name == 'list':
print_usage()
elif test_name == 'all':
run_all_tests()
elif test_name in TEST_CASES:
run_test(test_name, TEST_CASES[test_name])
else:
print(f"未知测试用例: {test_name}")
print_usage()
sys.exit(1)