feat: 增加初始化代码

master
Lxy 3 months ago
commit 25a5e7447e

@ -0,0 +1,922 @@
# AmazingData 数据源适配器 - 完整文档
## 目录
1. [适配器代码](#一适配器代码)
2. [接口详细说明](#二接口详细说明)
3. [使用示例](#三使用示例)
4. [数据结构说明](#四数据结构说明)
---
## 一、适配器代码
### 1. 主适配器代码 (amazing_data_adapter.py)
```python
"""
AmazingData 数据源适配器
基于银河证券星耀数智量化平台 SDK 的封装
提供统一、简洁的金融数据获取接口
"""
import pandas as pd
from typing import List, Dict, Optional, Union, Tuple
from datetime import datetime, date
from dataclasses import dataclass
from enum import Enum
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SecurityType(Enum):
"""证券类型枚举"""
STOCK_A = "EXTRA_STOCK_A" # 沪深A股
STOCK_A_SH_SZ = "EXTRA_STOCK_A_SH_SZ" # 沪深A股沪深
INDEX_A = "EXTRA_INDEX_A" # 沪深指数
ETF = "EXTRA_ETF" # ETF
FUTURE = "EXTRA_FUTURE" # 期货
KZZ = "EXTRA_KZZ" # 可转债
GLRA = "EXTRA_GLRA" # 逆回购
HKT = "EXTRA_HKT" # 港股通
ETF_OP = "EXTRA_ETF_OP" # ETF期权
class Market(Enum):
"""市场枚举"""
SH = "SH" # 上海
SZ = "SZ" # 深圳
BJ = "BJ" # 北京
class Period(Enum):
"""周期枚举"""
MIN1 = "min1"
MIN5 = "min5"
MIN15 = "min15"
MIN30 = "min30"
MIN60 = "min60"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
@dataclass
class DataSourceConfig:
"""数据源配置"""
username: str
password: str
host: str
port: int
local_path: str = "./amazing_data_cache/"
use_local_cache: bool = True
class AmazingDataAdapter:
"""
AmazingData 数据源适配器
封装银河证券星耀数智 SDK提供统一的数据获取接口
"""
def __init__(self, config: DataSourceConfig):
self.config = config
self._ad = None
self._base_data = None
self._market_data = None
self._info_data = None
self._calendar = None
self._is_logged_in = False
def connect(self) -> bool:
"""连接到数据源"""
try:
import AmazingData as ad
self._ad = ad
ad.login(
username=self.config.username,
password=self.config.password,
host=self.config.host,
port=self.config.port
)
self._base_data = ad.BaseData()
self._info_data = ad.InfoData()
self._calendar = self._base_data.get_calendar()
self._market_data = ad.MarketData(self._calendar)
self._is_logged_in = True
logger.info("成功连接到 AmazingData 数据源")
return True
except Exception as e:
logger.error(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self._is_logged_in and self._ad:
try:
self._ad.logout(self.config.username)
logger.info("已断开与 AmazingData 的连接")
except Exception as e:
logger.warning(f"断开连接时出错: {e}")
self._is_logged_in = False
# ==================== 基础数据接口 ====================
def get_code_list(self, security_type: SecurityType = SecurityType.STOCK_A) -> List[str]:
"""
获取代码列表
Args:
security_type: 证券类型,可选 STOCK_A, ETF, FUTURE, KZZ, INDEX_A 等
Returns:
证券代码列表,如 ['000001.SZ', '600000.SH', ...]
"""
self._check_login()
if security_type == SecurityType.FUTURE:
return self._base_data.get_future_code_list(security_type=security_type.value)
elif security_type == SecurityType.ETF_OP:
return self._base_data.get_option_code_list(security_type=security_type.value)
else:
return self._base_data.get_code_list(security_type=security_type.value)
def get_code_info(self, security_type: SecurityType = SecurityType.STOCK_A) -> pd.DataFrame:
"""
获取证券信息
Returns:
DataFrame 包含字段:
- symbol: 证券简称
- security_status: 产品状态标志
- pre_close: 昨收价
- high_limited: 涨停价
- low_limited: 跌停价
- price_tick: 最小价格变动单位
"""
self._check_login()
return self._base_data.get_code_info(security_type=security_type.value)
def get_trading_calendar(self, market: Market = Market.SH) -> List[int]:
"""
获取交易日历
Returns:
交易日列表,格式为 [20240102, 20240103, ...]
"""
self._check_login()
return self._base_data.get_calendar(market=market.value)
def get_adj_factor(self, codes: List[str], is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取单次复权因子
Args:
codes: 股票代码列表
is_local: 是否使用本地缓存
Returns:
DataFrame (index: 日期, columns: 股票代码)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._base_data.get_adj_factor(
code_list=codes, local_path=self.config.local_path, is_local=is_local
)
def get_backward_factor(self, codes: List[str], is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取后复权因子
Returns:
DataFrame (index: 日期, columns: 股票代码)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._base_data.get_backward_factor(
code_list=codes, local_path=self.config.local_path, is_local=is_local
)
# ==================== 历史行情数据接口 ====================
def get_kline(self, codes: Union[str, List[str]],
start_date: Union[str, int, date],
end_date: Union[str, int, date],
period: Period = Period.DAILY) -> Dict[str, pd.DataFrame]:
"""
获取历史K线数据
Args:
codes: 证券代码或列表,如 '000001.SZ' 或 ['000001.SZ', '600000.SH']
start_date: 开始日期,支持 '20240101' 或 20240101 或 date对象
end_date: 结束日期
period: K线周期可选 MIN1/MIN5/MIN15/MIN30/MIN60/DAILY/WEEKLY/MONTHLY
Returns:
Dict[代码, DataFrame]DataFrame包含字段
- open: 开盘价
- high: 最高价
- low: 最低价
- close: 收盘价
- volume: 成交量
- amount: 成交金额
"""
self._check_login()
if isinstance(codes, str):
codes = [codes]
start_date = self._format_date(start_date)
end_date = self._format_date(end_date)
return self._market_data.query_kline(
code_list=codes, begin_date=start_date, end_date=end_date,
period=getattr(self._ad.constant.Period, period.value).value
)
def get_snapshot(self, codes: Union[str, List[str]],
start_date: Union[str, int, date],
end_date: Union[str, int, date]) -> Dict[str, pd.DataFrame]:
"""
获取历史快照数据
Returns:
Dict[代码, DataFrame]包含Level-1行情快照
"""
self._check_login()
if isinstance(codes, str):
codes = [codes]
start_date = self._format_date(start_date)
end_date = self._format_date(end_date)
return self._market_data.query_snapshot(
code_list=codes, begin_date=start_date, end_date=end_date
)
# ==================== 财务数据接口 ====================
def get_balance_sheet(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取资产负债表
Args:
codes: 股票代码列表
start_date: 开始报告期 (如 20240930 表示2024年三季报)
end_date: 结束报告期
Returns:
Dict[代码, DataFrame],主要字段包括:
- TOTAL_ASSETS: 资产总计
- TOTAL_CUR_ASSETS: 流动资产合计
- TOTAL_NONCUR_ASSETS: 非流动资产合计
- TOTAL_LIAB: 负债合计
- TOTAL_CUR_LIAB: 流动负债合计
- TOT_SHARE_EQUITY_INCL_MIN_INT: 股东权益合计
- CURRENCY_CAP: 货币资金
- NOTES_RECEIVABLE: 应收票据
- ACCT_RECEIVABLE: 应收账款
- INV: 存货
- FIX_ASSETS: 固定资产
- NOTES_PAYABLE: 应付票据
- ACCT_PAYABLE: 应付账款
- ST_BORROWING: 短期借款
- LT_LOAN: 长期借款
"""
return self._get_financial_data('get_balance_sheet', codes, start_date, end_date, is_local)
def get_cash_flow(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取现金流量表
主要字段:
- NET_CASH_FLOWS_OPERA_ACT: 经营活动现金流净额
- NET_CASH_FLOWS_INV_ACT: 投资活动现金流净额
- NET_CASH_FLOWS_FIN_ACT: 筹资活动现金流净额
- NET_INCR_CASH_AND_CASH_EQU: 现金及现金等价物净增加额
- CASH_RECP_SG_AND_RS: 销售商品提供劳务收到的现金
- CASH_PAY_GOODS_SERVICES: 购买商品接受劳务支付的现金
"""
return self._get_financial_data('get_cash_flow', codes, start_date, end_date, is_local)
def get_income_statement(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取利润表
主要字段:
- TOT_OPERA_REV: 营业总收入
- OPERA_REV: 营业收入
- TOT_OPERA_COST: 营业总成本
- OPERA_PROFIT: 营业利润
- TOTAL_PROFIT: 利润总额
- NET_PRO_INCL_MIN_INT_INC: 净利润
- BASIC_EPS: 基本每股收益
- DILUTED_EPS: 稀释每股收益
- RD_EXP: 研发费用
- LESS_SELLING_EXP: 销售费用
- LESS_ADMIN_EXP: 管理费用
- LESS_FIN_EXP: 财务费用
"""
return self._get_financial_data('get_income', codes, start_date, end_date, is_local)
def get_profit_express(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取业绩快报
主要字段:
- TOTAL_ASSETS: 总资产
- NET_PRO_EXCL_MIN_INT_INC: 净利润
- TOT_OPERA_REV: 营业总收入
- TOTAL_PROFIT: 利润总额
- OPERA_PROFIT: 营业利润
- EPS_BASIC: 基本每股收益
- ROE_WEIGHTED: 净资产收益率-加权
- YOY_GR_NET_PROFIT_PARENT: 同比增长率:归属母公司股东的净利润
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_profit_express(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_profit_notice(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取业绩预告
主要字段:
- P_TYPECODE: 业绩预告类型代码
- P_CHANGE_MAX: 预告净利润变动幅度上限
- P_CHANGE_MIN: 预告净利润变动幅度下限
- NET_PROFIT_MAX: 预告净利润上限(万元)
- NET_PROFIT_MIN: 预告净利润下限(万元)
- P_REASON: 业绩变动原因
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_profit_notice(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 股东股本数据接口 ====================
def get_top10_shareholders(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取十大股东数据
主要字段:
- HOLDER_NAME: 股东名称
- HOLDER_QUANTITY: 持股数
- HOLDER_PCT: 持股比例(%)
- HOLDER_HOLDER_CATEGORY: 股东性质(1:个人, 2:公司)
- FLOAT_QTY: 流通股数量
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_share_holder(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_shareholder_count(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取股东户数数据
主要字段:
- HOLDER_TOTAL_NUM: A股、B股、H股、境外股的总户数
- HOLDER_NUM: A股股东户数
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_holder_num(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_equity_structure(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取股本结构数据
主要字段:
- TOT_SHARE: 总股本(万股)
- FLOAT_SHARE: 流通股(万股)
- FLOAT_A_SHARE: 流通A股(万股)
- RESTRICTED_A_SHARE: 限售A股(万股)
- TOT_RESTRICTED_SHARE: 限售股合计
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_equity_structure(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 融资融券数据接口 ====================
def get_margin_summary(self,
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取融资融券成交汇总
主要字段:
- TRADE_DATE: 交易日期
- SUM_BORROW_MONEY_BALANCE: 融资余额(元)
- SUM_PURCH_WITH_BORROW_MONEY: 融资买入额(元)
- SUM_REPAYMENT_OF_BORROW_MONEY: 融资偿还额(元)
- SUM_SEC_LENDING_BALANCE: 融券余额(元)
- SUM_SALES_OF_BORROWED_SEC: 融券卖出量
- SUM_MARGIN_TRADE_BALANCE: 融资融券余额(元)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_margin_summary(
local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_margin_detail(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取融资融券交易明细
主要字段:
- BORROW_MONEY_BALANCE: 融资余额
- PURCH_WITH_BORROW_MONEY: 融资买入额
- REPAYMENT_OF_BORROW_MONEY: 融资偿还额
- SEC_LENDING_BALANCE: 融券余额
- SALES_OF_BORROWED_SEC: 融券卖出量
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_margin_detail(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 交易异动数据接口 ====================
def get_longhu_bang(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取龙虎榜数据
主要字段:
- TRADE_DATE: 交易日期
- REASON_TYPE_NAME: 上榜原因
- CHANGE_RANGE: 涨跌幅(%)
- TRADER_NAME: 营业部名称
- BUY_AMOUNT: 买入金额(元)
- SELL_AMOUNT: 卖出金额(元)
- FLOW_MARK: 买卖表示(1买入, 2卖出)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_long_hu_bang(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_block_trading(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取大宗交易数据
主要字段:
- TRADE_DATE: 交易日期
- B_SHARE_PRICE: 成交价(元)
- B_SHARE_VOLUME: 成交量(万股)
- B_SHARE_AMOUNT: 成交金额(万元)
- B_BUYER_NAME: 买方营业部名称
- B_SELLER_NAME: 卖方营业部名称
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_block_trading(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 指数数据接口 ====================
def get_index_constituents(self, codes: List[str],
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取指数成分股
Args:
codes: 指数代码列表,如 ['000300.SH', '000905.SH']
Returns:
Dict[指数代码, DataFrame]
DataFrame字段
- INDEX_CODE: 指数代码
- CON_CODE: 成分股代码
- INDATE: 纳入日期
- OUTDATE: 剔除日期
- INDEX_NAME: 指数名称
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_index_constituent(
code_list=codes, local_path=self.config.local_path, is_local=is_local
)
def get_index_weights(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取指数成分股权重
支持指数:
- 000016.SH: 上证50
- 000300.SH: 沪深300
- 000905.SH: 中证500
- 000906.SH: 中证800
- 000852.SH: 中证1000
DataFrame字段
- INDEX_CODE: 指数代码
- CON_CODE: 标的代码
- TRADE_DATE: 生效日期
- WEIGHT: 权重(%)
- CLOSE: 收盘价
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_index_weight(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== ETF数据接口 ====================
def get_etf_pcf(self, codes: List[str]) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
获取ETF申赎数据
Returns:
(etf_info, etf_constituents)
etf_info字段
- creation_redemption_unit: 每个篮子对应的ETF份数
- max_cash_ratio: 最大现金替代比例
- creation: 是否允许申购
- redemption: 是否允许赎回
etf_constituents字段
- underlying_symbol: 成份证券简称
- component_share: 成份证券数量
- substitute_flag: 现金替代标志
"""
self._check_login()
return self._base_data.get_etf_pcf(code_list=codes)
def get_fund_share(self, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取基金份额数据
主要字段:
- FUND_SHARE: 基金份额(万份)
- TOTAL_SHARE: 基金总份额(万份)
- FLOAT_SHARE: 流通份额(万份)
- CHANGE_REASON: 份额变动原因
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_fund_share(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 可转债数据接口 ====================
def get_kzz_issuance(self, codes: List[str],
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取可转债发行数据
主要字段:
- STOCK_CODE: 正股代码
- LISTED_DATE: 上市日期
- PLAN_SCHEDULE: 方案进度
- CLAUSE_INI_CONV_PRICE: 初始转换价格
- LIST_ISSUE_SIZE: 发行规模(万元)
- LIST_ISSUE_QUANTITY: 发行数量(万张)
- TERM_YEAR: 借款期限(年)
- COUPON_RATE: 利率(%)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_kzz_issuance(
code_list=codes, local_path=self.config.local_path, is_local=is_local
)
# ==================== 辅助方法 ====================
def _check_login(self):
if not self._is_logged_in:
raise RuntimeError("未连接到数据源,请先调用 connect()")
def _format_date(self, d: Union[str, int, date]) -> int:
if isinstance(d, int):
return d
elif isinstance(d, str):
return int(d.replace("-", "").replace("/", ""))
elif isinstance(d, date):
return int(d.strftime("%Y%m%d"))
else:
raise ValueError(f"不支持的日期格式: {d}")
def _get_financial_data(self, method: str, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
method = getattr(self._info_data, method)
return method(
code_list=codes, local_path=self.config.local_path, is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def create_adapter(username: str, password: str, host: str, port: int,
local_path: str = "./amazing_data_cache/",
use_local_cache: bool = True) -> AmazingDataAdapter:
"""快速创建适配器实例"""
config = DataSourceConfig(
username=username, password=password, host=host, port=port,
local_path=local_path, use_local_cache=use_local_cache
)
return AmazingDataAdapter(config)
```
---
## 二、接口详细说明
### 1. 基础数据接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_code_list` | 获取代码列表 | `List[str]` | 代码列表 |
| `get_code_info` | 获取证券信息 | `DataFrame` | symbol, pre_close, high_limited, low_limited |
| `get_trading_calendar` | 获取交易日历 | `List[int]` | 交易日列表 |
| `get_adj_factor` | 单次复权因子 | `DataFrame` | index=日期, columns=代码 |
| `get_backward_factor` | 后复权因子 | `DataFrame` | index=日期, columns=代码 |
### 2. 历史行情接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_kline` | K线数据 | `Dict[str, DataFrame]` | open, high, low, close, volume, amount |
| `get_snapshot` | 历史快照 | `Dict[str, DataFrame]` | Level-1行情数据 |
**Period 周期选项:**
- `Period.MIN1` / `Period.MIN5` / `Period.MIN15` / `Period.MIN30` / `Period.MIN60`
- `Period.DAILY` / `Period.WEEKLY` / `Period.MONTHLY`
### 3. 财务数据接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_balance_sheet` | 资产负债表 | `Dict[str, DataFrame]` | TOTAL_ASSETS, TOTAL_LIAB, TOT_SHARE_EQUITY |
| `get_cash_flow` | 现金流量表 | `Dict[str, DataFrame]` | NET_CASH_FLOWS_OPERA_ACT, NET_CASH_FLOWS_INV_ACT |
| `get_income_statement` | 利润表 | `Dict[str, DataFrame]` | TOT_OPERA_REV, NET_PRO_INCL_MIN_INT_INC, BASIC_EPS |
| `get_profit_express` | 业绩快报 | `DataFrame` | ROE_WEIGHTED, YOY_GR_NET_PROFIT_PARENT |
| `get_profit_notice` | 业绩预告 | `DataFrame` | P_CHANGE_MAX, P_CHANGE_MIN, NET_PROFIT_MAX |
### 4. 股东股本接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_top10_shareholders` | 十大股东 | `DataFrame` | HOLDER_NAME, HOLDER_QUANTITY, HOLDER_PCT |
| `get_shareholder_count` | 股东户数 | `DataFrame` | HOLDER_TOTAL_NUM, HOLDER_NUM |
| `get_equity_structure` | 股本结构 | `DataFrame` | TOT_SHARE, FLOAT_SHARE, RESTRICTED_A_SHARE |
### 5. 融资融券接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_margin_summary` | 融资融券汇总 | `DataFrame` | SUM_BORROW_MONEY_BALANCE, SUM_MARGIN_TRADE_BALANCE |
| `get_margin_detail` | 个股融资融券 | `Dict[str, DataFrame]` | BORROW_MONEY_BALANCE, SEC_LENDING_BALANCE |
### 6. 交易异动接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_longhu_bang` | 龙虎榜 | `DataFrame` | TRADER_NAME, BUY_AMOUNT, SELL_AMOUNT, FLOW_MARK |
| `get_block_trading` | 大宗交易 | `DataFrame` | B_SHARE_PRICE, B_SHARE_VOLUME, B_BUYER_NAME |
### 7. 指数数据接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_index_constituents` | 指数成分股 | `Dict[str, DataFrame]` | INDEX_CODE, CON_CODE, INDATE, OUTDATE |
| `get_index_weights` | 成分股权重 | `Dict[str, DataFrame]` | INDEX_CODE, CON_CODE, WEIGHT |
**支持的指数代码:**
- `000016.SH` - 上证50
- `000300.SH` - 沪深300
- `000905.SH` - 中证500
- `000906.SH` - 中证800
- `000852.SH` - 中证1000
### 8. ETF数据接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_etf_pcf` | ETF申赎数据 | `Tuple[DataFrame, Dict]` | creation_redemption_unit, component_share |
| `get_fund_share` | 基金份额 | `Dict[str, DataFrame]` | FUND_SHARE, TOTAL_SHARE |
### 9. 可转债数据接口
| 接口名 | 功能 | 返回类型 | 主要字段 |
|--------|------|----------|----------|
| `get_kzz_issuance` | 可转债发行 | `Dict[str, DataFrame]` | CLAUSE_INI_CONV_PRICE, LIST_ISSUE_SIZE, COUPON_RATE |
---
## 三、使用示例
### 基础使用
```python
from amazing_data_adapter import create_adapter, SecurityType, Period
# 创建并连接
adapter = create_adapter(
username='your_username',
password='your_password',
host='your_host',
port=8080
)
if adapter.connect():
# 获取A股列表
codes = adapter.get_code_list(SecurityType.STOCK_A)
print(f"A股数量: {len(codes)}")
# 获取平安银行K线
kline = adapter.get_kline(
codes=['000001.SZ'],
start_date='20240101',
end_date='20241231',
period=Period.DAILY
)
print(kline['000001.SZ'].head())
adapter.disconnect()
```
### 获取财务数据
```python
# 获取资产负债表
balance = adapter.get_balance_sheet(
codes=['000001.SZ', '600000.SH'],
start_date=20240930,
end_date=20240930
)
for code, df in balance.items():
print(f"{code} 总资产: {df['TOTAL_ASSETS'].values[0]}")
```
### 获取指数成分股
```python
# 沪深300成分股
constituents = adapter.get_index_constituents(['000300.SH'])
df = constituents['000300.SH']
print(f"成分股数量: {len(df)}")
print(df[['CON_CODE', 'INDATE']].head())
```
### 批量处理
```python
# 分批获取数据避免超时
all_codes = adapter.get_code_list(SecurityType.STOCK_A)
batch_size = 50
for i in range(0, 100, batch_size): # 只取前100只演示
batch = all_codes[i:i+batch_size]
data = adapter.get_balance_sheet(batch)
print(f"已处理 {i+len(batch)} 只股票")
```
---
## 四、数据结构说明
### K线数据字段
| 字段名 | 类型 | 说明 |
|--------|------|------|
| open | float | 开盘价 |
| high | float | 最高价 |
| low | float | 最低价 |
| close | float | 收盘价 |
| volume | int | 成交量(股) |
| amount | float | 成交金额(元) |
### 资产负债表关键字段
| 字段名 | 说明 |
|--------|------|
| TOTAL_ASSETS | 资产总计 |
| TOTAL_CUR_ASSETS | 流动资产合计 |
| TOTAL_NONCUR_ASSETS | 非流动资产合计 |
| TOTAL_LIAB | 负债合计 |
| TOT_SHARE_EQUITY_INCL_MIN_INT | 股东权益合计(含少数股东) |
| CURRENCY_CAP | 货币资金 |
| NOTES_RECEIVABLE | 应收票据 |
| ACCT_RECEIVABLE | 应收账款 |
| INV | 存货 |
### 利润表关键字段
| 字段名 | 说明 |
|--------|------|
| TOT_OPERA_REV | 营业总收入 |
| OPERA_PROFIT | 营业利润 |
| TOTAL_PROFIT | 利润总额 |
| NET_PRO_INCL_MIN_INT_INC | 净利润(含少数股东) |
| BASIC_EPS | 基本每股收益 |
| DILUTED_EPS | 稀释每股收益 |
### 现金流量表关键字段
| 字段名 | 说明 |
|--------|------|
| NET_CASH_FLOWS_OPERA_ACT | 经营活动现金流净额 |
| NET_CASH_FLOWS_INV_ACT | 投资活动现金流净额 |
| NET_CASH_FLOWS_FIN_ACT | 筹资活动现金流净额 |
| NET_INCR_CASH_AND_CASH_EQU | 现金及现金等价物净增加额 |
---
## 五、注意事项
1. **连接管理**: 使用前先调用 `connect()`,使用后调用 `disconnect()`
2. **日期格式**: 支持 `20240101`、`"2024-01-01"` 或 `date` 对象
3. **本地缓存**: 默认启用,可设置 `is_local=False` 强制从服务器获取
4. **批量处理**: 大量数据建议分批获取,每批 50-100 个代码
5. **错误处理**: 连接断开会抛出 `RuntimeError`,需做好异常处理
---
**文件保存位置**: `/root/.openclaw/workspace/amazing_data_adapter.py`

Binary file not shown.

@ -0,0 +1,268 @@
# AmazingData 数据源适配器
基于中国银河证券星耀数智量化平台 SDK 的封装,提供统一、简洁的金融数据获取接口。
## 功能特性
- **简洁的API设计**: 封装复杂的SDK接口提供直观的数据获取方法
- **类型安全**: 使用Python类型注解IDE友好的代码提示
- **灵活的配置**: 支持本地缓存、参数自定义等配置选项
- **全面的数据覆盖**: 支持行情、财务、股本、融资融券等多类金融数据
## 安装依赖
```bash
# 安装 AmazingData SDK (需从银河证券获取)
pip install tgw-1.*.*-py3-none-any.whl
pip install AmazingData-1.*.*-cp3x-none-any.whl
# 安装其他依赖
pip install pandas
```
## 快速开始
```python
from amazing_data_adapter import create_adapter, SecurityType, Period
# 1. 创建适配器
adapter = create_adapter(
username='your_username',
password='your_password',
host='your_host',
port=8080,
local_path='./data_cache/', # 本地缓存路径
use_local_cache=True # 是否使用本地缓存
)
# 2. 连接数据源
if adapter.connect():
# 3. 获取数据
codes = adapter.get_code_list(SecurityType.STOCK_A)
kline = adapter.get_kline(
codes=['000001.SZ'],
start_date='20240101',
end_date='20241231',
period=Period.DAILY
)
# 4. 断开连接
adapter.disconnect()
```
## 功能模块
### 1. 基础数据
| 方法 | 说明 |
|------|------|
| `get_code_list(security_type)` | 获取代码列表 |
| `get_code_info(security_type)` | 获取证券基本信息 |
| `get_trading_calendar(market)` | 获取交易日历 |
| `get_adj_factor(codes)` | 获取单次复权因子 |
| `get_backward_factor(codes)` | 获取后复权因子 |
### 2. 历史行情数据
| 方法 | 说明 |
|------|------|
| `get_kline(codes, start_date, end_date, period)` | 获取K线数据 |
| `get_snapshot(codes, start_date, end_date)` | 获取历史快照 |
**支持的周期 (Period)**:
- `Period.MIN1` - 1分钟
- `Period.MIN5` - 5分钟
- `Period.MIN15` - 15分钟
- `Period.MIN30` - 30分钟
- `Period.MIN60` - 60分钟
- `Period.DAILY` - 日线
- `Period.WEEKLY` - 周线
- `Period.MONTHLY` - 月线
### 3. 财务数据
| 方法 | 说明 |
|------|------|
| `get_balance_sheet(codes, start_date, end_date)` | 资产负债表 |
| `get_cash_flow(codes, start_date, end_date)` | 现金流量表 |
| `get_income_statement(codes, start_date, end_date)` | 利润表 |
| `get_profit_express(codes, start_date, end_date)` | 业绩快报 |
| `get_profit_notice(codes, start_date, end_date)` | 业绩预告 |
### 4. 股东股本数据
| 方法 | 说明 |
|------|------|
| `get_top10_shareholders(codes, start_date, end_date)` | 十大股东 |
| `get_shareholder_count(codes, start_date, end_date)` | 股东户数 |
| `get_equity_structure(codes, start_date, end_date)` | 股本结构 |
### 5. 融资融券数据
| 方法 | 说明 |
|------|------|
| `get_margin_summary(start_date, end_date)` | 融资融券汇总 |
| `get_margin_detail(codes, start_date, end_date)` | 个股融资融券明细 |
### 6. 交易异动数据
| 方法 | 说明 |
|------|------|
| `get_longhu_bang(codes, start_date, end_date)` | 龙虎榜数据 |
| `get_block_trading(codes, start_date, end_date)` | 大宗交易 |
### 7. 指数数据
| 方法 | 说明 |
|------|------|
| `get_index_constituents(codes)` | 指数成分股 |
| `get_index_weights(codes, start_date, end_date)` | 成分股权重 |
**支持的指数**:
- `000016.SH` - 上证50
- `000300.SH` - 沪深300
- `000905.SH` - 中证500
- `000906.SH` - 中证800
- `000852.SH` - 中证1000
### 8. ETF数据
| 方法 | 说明 |
|------|------|
| `get_etf_pcf(codes)` | ETF申赎数据 |
| `get_fund_share(codes, start_date, end_date)` | 基金份额 |
### 9. 可转债数据
| 方法 | 说明 |
|------|------|
| `get_kzz_issuance(codes)` | 可转债发行数据 |
## 证券类型枚举
```python
from amazing_data_adapter import SecurityType
SecurityType.STOCK_A # 沪深A股
SecurityType.STOCK_A_SH_SZ # 沪深A股沪深
SecurityType.INDEX_A # 沪深指数
SecurityType.ETF # ETF
SecurityType.FUTURE # 期货
SecurityType.KZZ # 可转债
SecurityType.GLRA # 逆回购
SecurityType.HKT # 港股通
SecurityType.ETF_OP # ETF期权
```
## 使用示例
### 获取历史K线数据
```python
# 获取多只股票日线数据
kline_data = adapter.get_kline(
codes=['000001.SZ', '600000.SH'],
start_date='20240101',
end_date='20241231',
period=Period.DAILY
)
for code, df in kline_data.items():
print(f"{code}: {len(df)} 条数据")
print(df.head())
```
### 获取财务报表
```python
# 获取资产负债表
balance_sheet = adapter.get_balance_sheet(
codes=['000001.SZ', '600000.SH'],
start_date=20240101,
end_date=20241231
)
for code, df in balance_sheet.items():
print(f"\n{code} 资产负债表:")
print(df[['REPORTING_PERIOD', 'TOTAL_ASSETS', 'TOTAL_CUR_ASSETS']])
```
### 获取指数成分股
```python
# 获取沪深300成分股
constituents = adapter.get_index_constituents(['000300.SH'])
df = constituents['000300.SH']
print(f"沪深300成分股数量: {len(df)}")
print(df[['CON_CODE', 'INDATE', 'INDEX_NAME']].head())
```
### 批量数据处理
```python
# 获取所有A股代码
all_codes = adapter.get_code_list(SecurityType.STOCK_A)
# 分批处理避免超时
batch_size = 50
for i in range(0, len(all_codes), batch_size):
batch_codes = all_codes[i:i+batch_size]
data = adapter.get_balance_sheet(batch_codes)
# 处理数据...
```
### 结合复权因子计算真实价格
```python
# 获取K线和复权因子
kline = adapter.get_kline(['000001.SZ'], '20240101', '20241231')
adj_factor = adapter.get_backward_factor(['000001.SZ'])
df = kline['000001.SZ']
# 合并并计算复权价格
df['trade_date'] = df.index.strftime('%Y%m%d').astype(int)
df = df.merge(adj_factor[['000001.SZ']].reset_index(),
left_on='trade_date', right_on='index')
df['adj_close'] = df['close'] * df['000001.SZ']
```
## 数据缓存
适配器支持本地数据缓存,可大幅提升重复查询的速度:
```python
adapter = create_adapter(
username='xxx',
password='xxx',
host='xxx',
port=8080,
local_path='./my_data_cache/', # 缓存目录
use_local_cache=True # 默认启用缓存
)
# 强制从服务器获取最新数据
adapter.get_kline(codes, start_date, end_date, is_local=False)
```
## 注意事项
1. **账号权限**: 使用本适配器需要先向中国银河证券申请开通星耀数智平台权限
2. **日期格式**: 支持多种日期格式:
- `int`: 20240101
- `str`: "2024-01-01" 或 "20240101"
- `date`: datetime.date(2024, 1, 1)
3. **错误处理**: 所有方法在连接断开会抛出 `RuntimeError`,建议在外层做好异常处理
4. **资源释放**: 使用完毕后请调用 `adapter.disconnect()` 断开连接
## 文件说明
- `amazing_data_adapter.py` - 适配器主代码
- `amazing_data_examples.py` - 详细使用示例
- `README.md` - 本文档
## API参考
详细的SDK接口文档请参考银河证券提供的《AmazingData开发手册》。

@ -0,0 +1,840 @@
"""
AmazingData 数据源适配器
基于银河证券星耀数智量化平台 SDK 的封装
提供统一简洁的金融数据获取接口
"""
from calendar import calendar
import pandas as pd
from typing import List, Dict, Optional, Union, Tuple
from datetime import datetime, date
from dataclasses import dataclass
from enum import Enum
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SecurityType(Enum):
"""证券类型枚举"""
STOCK_A = "EXTRA_STOCK_A" # 沪深A股
STOCK_A_SH_SZ = "EXTRA_STOCK_A_SH_SZ" # 沪深A股沪深
INDEX_A = "EXTRA_INDEX_A" # 沪深指数
ETF = "EXTRA_ETF" # ETF
FUTURE = "EXTRA_FUTURE" # 期货
KZZ = "EXTRA_KZZ" # 可转债
GLRA = "EXTRA_GLRA" # 逆回购
HKT = "EXTRA_HKT" # 港股通
ETF_OP = "EXTRA_ETF_OP" # ETF期权
class Market(Enum):
"""市场枚举"""
SH = "SH" # 上海
SZ = "SZ" # 深圳
BJ = "BJ" # 北京
class Period(Enum):
"""周期枚举"""
MIN1 = "min1"
MIN3 = "min3"
MIN5 = "min5"
MIN10 = "min10"
MIN15 = "min15"
MIN30 = "min30"
MIN60 = "min60"
MIN120 = "min120"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
SEASON = "season"
YEAR = "year"
@dataclass
class DataSourceConfig:
"""数据源配置"""
username: str
password: str
host: str
port: int
local_path: str = "./amazing_data_cache/"
use_local_cache: bool = True
class AmazingDataAdapter:
"""
AmazingData 数据源适配器
封装银河证券星耀数智 SDK提供统一的数据获取接口
"""
def __init__(self, config: DataSourceConfig):
"""
初始化适配器
Args:
config: 数据源配置
"""
self.config = config
self._ad = None
self._base_data = None
self._market_data = None
self._info_data = None
self._calendar = None
self._is_logged_in = False
def connect(self) -> bool:
"""
连接到数据源
Returns:
bool: 是否连接成功
"""
try:
import AmazingData as ad
self._ad = ad
# 登录
ad.login(
username=self.config.username,
password=self.config.password,
host=self.config.host,
port=self.config.port
)
# 初始化数据类
self._base_data = ad.BaseData()
self._info_data = ad.InfoData()
self._calendar = self._base_data.get_calendar()
self._market_data = ad.MarketData(self._calendar)
self._is_logged_in = True
logger.info("成功连接到 AmazingData 数据源")
return True
except Exception as e:
logger.error(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self._is_logged_in and self._ad:
try:
self._ad.logout(self.config.username)
logger.info("已断开与 AmazingData 的连接")
except Exception as e:
logger.warning(f"断开连接时出错: {e}")
self._is_logged_in = False
# ==================== 基础数据接口 ====================
def get_code_list(self, security_type: SecurityType = SecurityType.STOCK_A) -> List[str]:
"""
获取代码列表
Args:
security_type: 证券类型
Returns:
证券代码列表
"""
self._check_login()
if security_type == SecurityType.FUTURE:
return self._base_data.get_future_code_list(security_type=security_type.value)
elif security_type == SecurityType.ETF_OP:
return self._base_data.get_option_code_list(security_type=security_type.value)
else:
return self._base_data.get_code_list(security_type=security_type.value)
def get_code_info(self, security_type: SecurityType = SecurityType.STOCK_A) -> pd.DataFrame:
"""
获取证券信息
Args:
security_type: 证券类型
Returns:
DataFrame 包含证券基本信息
"""
self._check_login()
return self._base_data.get_code_info(security_type=security_type.value)
def get_trading_calendar(self, market: Market = Market.SH) -> List[int]:
"""
获取交易日历
Args:
market: 市场
Returns:
交易日列表 (YYYYMMDD 格式)
"""
self._check_login()
return self._base_data.get_calendar(market=market.value)
def get_adj_factor(self, codes: List[str],
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取复权因子单次复权
Args:
codes: 股票代码列表
is_local: 是否使用本地缓存
Returns:
DataFrame (index: 日期, columns: 股票代码)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._base_data.get_adj_factor(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
def get_backward_factor(self, codes: List[str],
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取后复权因子
Args:
codes: 股票代码列表
is_local: 是否使用本地缓存
Returns:
DataFrame (index: 日期, columns: 股票代码)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._base_data.get_backward_factor(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
# ==================== 历史行情数据接口 ====================
def get_kline(self,
codes: Union[str, List[str]],
start_date: Union[str, int, date],
end_date: Union[str, int, date],
period: Period = Period.DAILY) -> Dict[str, pd.DataFrame]:
"""
获取历史K线数据
Args:
codes: 证券代码或代码列表
start_date: 开始日期
end_date: 结束日期
period: K线周期
Returns:
Dict[代码, DataFrame]DataFrame包含OHLCV等字段
"""
self._check_login()
if isinstance(codes, str):
codes = [codes]
start_date = self._format_date(start_date)
end_date = self._format_date(end_date)
print(f"正在获取K线数据: 代码={codes}, 日期范围={start_date}~{end_date}, 周期={period.value}")
# 获取K线数据
kline_dict = self._market_data.query_kline(
code_list=codes,
begin_date=20240302,
end_date=20240306,
period=self._ad.constant.Period.day.value)
print(f"成功获取K线数据: {len(kline_dict)} 只证券")
print(f"示例数据:\n{kline_dict[codes[0]].head()}")
return kline_dict
def get_snapshot(self,
codes: Union[str, List[str]],
start_date: Union[str, int, date],
end_date: Union[str, int, date]) -> Dict[str, pd.DataFrame]:
"""
获取历史快照数据tick级别
Args:
codes: 证券代码或代码列表
start_date: 开始日期
end_date: 结束日期
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
if isinstance(codes, str):
codes = [codes]
start_date = self._format_date(start_date)
end_date = self._format_date(end_date)
snapshot_dict = self._market_data.query_snapshot(
code_list=codes,
begin_date=start_date,
end_date=end_date
)
return snapshot_dict
# ==================== 财务数据接口 ====================
def get_balance_sheet(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取资产负债表
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
return self._get_financial_data(
'get_balance_sheet', codes, start_date, end_date, is_local
)
def get_cash_flow(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取现金流量表
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
return self._get_financial_data(
'get_cash_flow', codes, start_date, end_date, is_local
)
def get_income_statement(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取利润表
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
return self._get_financial_data(
'get_income', codes, start_date, end_date, is_local
)
def get_profit_express(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取业绩快报
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_profit_express(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_profit_notice(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取业绩预告
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_profit_notice(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 股东股本数据接口 ====================
def get_top10_shareholders(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取十大股东数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_share_holder(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_shareholder_count(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取股东户数数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_holder_num(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_equity_structure(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取股本结构数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_equity_structure(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 融资融券数据接口 ====================
def get_margin_summary(self,
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取融资融券成交汇总
Args:
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_margin_summary(
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_margin_detail(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取融资融券交易明细
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_margin_detail(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 交易异动数据接口 ====================
def get_longhu_bang(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取龙虎榜数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_long_hu_bang(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_block_trading(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取大宗交易数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_block_trading(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 指数数据接口 ====================
def get_index_constituents(self,
codes: List[str],
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取指数成分股
Args:
codes: 指数代码列表
is_local: 是否使用本地缓存
Returns:
Dict[指数代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_index_constituent(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
def get_index_weights(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取指数成分股权重
支持上证50(000016.SH)沪深300(000300.SH)中证500(000905.SH)
中证800(000906.SH)中证1000(000852.SH)
Args:
codes: 指数代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
Dict[指数代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_index_weight(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== ETF数据接口 ====================
def get_etf_pcf(self, codes: List[str]) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
获取ETF申赎数据
Args:
codes: ETF代码列表
Returns:
(etf_info, etf_constituents)
"""
self._check_login()
return self._base_data.get_etf_pcf(code_list=codes)
def get_fund_share(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取基金份额数据
Args:
codes: ETF代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_fund_share(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 可转债数据接口 ====================
def get_kzz_issuance(self,
codes: List[str],
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取可转债发行数据
Args:
codes: 可转债代码列表
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_kzz_issuance(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
# ==================== 辅助方法 ====================
def _check_login(self):
"""检查是否已登录"""
if not self._is_logged_in:
raise RuntimeError("未连接到数据源,请先调用 connect()")
def _format_date(self, d: Union[str, int, date]) -> int:
"""统一日期格式为 YYYYMMDD"""
if isinstance(d, int):
return d
elif isinstance(d, str):
return int(d.replace("-", "").replace("/", ""))
elif isinstance(d, date):
return int(d.strftime("%Y%m%d"))
else:
raise ValueError(f"不支持的日期格式: {d}")
def _get_financial_data(self, method: str, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""通用财务数据获取方法"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
method = getattr(self._info_data, method)
return method(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 便捷函数 ====================
def create_adapter(username: str, password: str, host: str, port: int,
local_path: str = "./amazing_data_cache/",
use_local_cache: bool = True) -> AmazingDataAdapter:
"""
快速创建适配器实例
Args:
username: 用户名
password: 密码
host: 服务器地址
port: 服务器端口
local_path: 本地缓存路径
use_local_cache: 是否使用本地缓存
Returns:
AmazingDataAdapter 实例
"""
config = DataSourceConfig(
username=username,
password=password,
host=host,
port=port,
local_path=local_path,
use_local_cache=use_local_cache
)
return AmazingDataAdapter(config)
# ==================== 使用示例 ====================
if __name__ == "__main__":
# 示例代码
print("""
# 使用示例:
# 1. 创建适配器
adapter = create_adapter(
username='your_username',
password='your_password',
host='your_host',
port=your_port
)
# 2. 连接数据源
if adapter.connect():
# 3. 获取沪深A股代码列表
codes = adapter.get_code_list(SecurityType.STOCK_A)
print(f"获取到 {len(codes)} 只股票")
# 4. 获取历史K线数据
kline_data = adapter.get_kline(
codes=['000001.SZ', '600000.SH'],
start_date='20240101',
end_date='20241231',
period=Period.DAILY
)
# 5. 获取财务数据
balance_sheet = adapter.get_balance_sheet(
codes=['000001.SZ', '600000.SH'],
start_date='20240101',
end_date='20241231'
)
# 6. 获取指数成分股
constituents = adapter.get_index_constituents(['000300.SH']) # 沪深300
# 7. 断开连接
adapter.disconnect()
""")

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -0,0 +1,681 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
================================================================================
期货/股票多周期数据获取与技术指标计算脚本
================================================================================
功能介绍
本脚本用于获取期货或股票的多周期K线数据并自动计算技术指标输出JSON格式
可直接用于技术分析Skill的输入
核心功能
1. 多周期数据获取
- 5分钟K线至少50根
- 15分钟K线至少50根
- 30分钟K线至少50根
- 60分钟K线至少50根
- 日K线至少50根
2. 技术指标自动计算每根K线都包含
- MA10: 10/周期移动平均线
- MA20: 20/周期移动平均线
- MACD_DIF: MACD快线
- MACD_DEA: MACD慢线信号线
- MACD_HISTOGRAM: MACD柱状图
3. 输出格式
每个周期为一个数组数组中每项包含
{
"time": "K线时间",
"open": 开盘价,
"high": 最高价,
"low": 最低价,
"close": 收盘价,
"volume": 成交量,
"ma10": MA10值,
"ma20": MA20值,
"macd_dif": DIF值,
"macd_dea": DEA值,
"macd_histogram": 柱状图值
}
使用方法
1. 安装依赖
pip install akshare pandas
2. 运行脚本
# 获取期货数据(默认)
python futures_data_collector.py --symbol SN2504 --type futures
# 获取股票数据
python futures_data_collector.py --symbol 000001 --type stock
python futures_data_collector.py --symbol 600000 --type stock --output stock_data.json
3. 参数说明
--symbol: 代码必填
期货格式品种代码 + 年份 + 月份 SN2504(沪锡)AG2506(沪银)
股票格式6位数字代码 000001(平安银行)600000(浦发银行)
--type: 数据类型可选默认 futures
可选值futures(期货)stock(股票)
--output: 输出文件名可选
默认格式{代码}_{时间戳}.json
常见代码对照
期货
SN2504 - 沪锡上海期货交易所
AG2506 - 沪银上海期货交易所
LC2505 - 碳酸锂广州期货交易所
NI2505 - 沪镍上海期货交易所
股票
000001 - 平安银行
600000 - 浦发银行
000858 - 五粮液
600519 - 贵州茅台
输出示例
{
"symbol": "SN2504",
"current_price": 223500,
"timestamp": "2026-03-07T22:15:00+08:00",
"timeframes": {
"60min": [
{
"time": "2026-03-07 14:00",
"open": 22100,
"high": 22300,
"low": 22050,
"close": 22250,
"volume": 12500,
"ma10": 22180.5,
"ma20": 22000.3,
"macd_dif": 0.0523,
"macd_dea": 0.0312,
"macd_histogram": 0.0422
}
],
"30min": [ ... ],
"15min": [ ... ],
"5min": [ ... ],
"daily": [ ... ]
}
}
注意事项
1. 数据源使用akshare数据可能有延迟或频率限制
2. 分钟数据受交易所限制可能无法获取太多历史数据
3. 如遇数据获取失败请检查合约代码是否正确
4. 脚本会自动过滤掉数据不足的周期
作者OpenClaw Assistant
日期2026-03-07
================================================================================
"""
import akshare as ak
import pandas as pd
import json
import argparse
import os
from datetime import datetime, timedelta
from typing import Dict, List
import warnings
warnings.filterwarnings('ignore')
# 清除缓存
ak.cache = {}
# 数据目录配置
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
os.makedirs(DATA_DIR, exist_ok=True)
def calculate_ma(df: pd.DataFrame, periods: List[int] = [10, 20]) -> pd.DataFrame:
"""计算移动平均线"""
for period in periods:
df[f'MA{period}'] = df['close'].rolling(window=period, min_periods=1).mean()
return df
def calculate_macd(df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame:
"""计算MACD指标"""
ema_fast = df['close'].ewm(span=fast, adjust=False).mean()
ema_slow = df['close'].ewm(span=slow, adjust=False).mean()
df['macd_dif'] = ema_fast - ema_slow
df['macd_dea'] = df['macd_dif'].ewm(span=signal, adjust=False).mean()
df['macd_histogram'] = (df['macd_dif'] - df['macd_dea']) * 2
# MACD信号判断
df['macd_signal'] = df.apply(lambda row:
'bullish' if row['macd_dif'] > row['macd_dea'] and row['macd_histogram'] > 0
else 'bearish' if row['macd_dif'] < row['macd_dea'] and row['macd_histogram'] < 0
else 'neutral', axis=1)
return df
def get_current_time() -> datetime:
"""获取当前北京时间(去除微秒)"""
return datetime.now().replace(microsecond=0)
def filter_future_data(df: pd.DataFrame, current_time: datetime = None) -> pd.DataFrame:
"""
过滤掉未来数据
Args:
df: 包含datetime列的DataFrame
current_time: 当前时间默认为系统当前时间
Returns:
过滤后的DataFrame
"""
if current_time is None:
current_time = get_current_time()
if 'datetime' not in df.columns:
return df
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤掉大于当前时间的数据(未来数据)
original_count = len(df)
df = df[df['datetime'] <= current_time].copy()
filtered_count = original_count - len(df)
if filtered_count > 0:
print(f" 过滤了 {filtered_count} 条未来数据")
return df
def extend_night_session_data(df: pd.DataFrame, symbol: str, period: str) -> pd.DataFrame:
"""
尝试获取完整的夜盘数据
期货夜盘时间21:00 - 02:30次日
"""
if df.empty or 'datetime' not in df.columns:
return df
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 检查是否需要补充夜盘数据
# 获取最后一条数据的时间
last_time = df['datetime'].iloc[-1]
last_hour = last_time.hour
last_minute = last_time.minute
# 如果最后数据在 21:00-23:59 或 00:00-02:30 范围内,可能需要补充
# 夜盘结束时间是 02:30
is_night_session = (
(last_hour >= 21) or # 21:00 - 23:59
(last_hour < 2) or # 00:00 - 01:59
(last_hour == 2 and last_minute <= 30) # 02:00 - 02:30
)
if not is_night_session:
return df
# 检查是否包含 02:30 的数据对于5分钟、15分钟等周期
has_0230 = False
for dt in df['datetime']:
if dt.hour == 2 and dt.minute == 30:
has_0230 = True
break
# 如果已经有 02:30 的数据,说明夜盘完整
if has_0230:
return df
# 尝试通过获取历史数据来补充夜盘
# 由于akshare限制我们记录警告信息
print(f" 注意: 夜盘数据可能不完整缺少02:30及之前的数据")
return df
def get_minute_data(symbol: str, period: str) -> pd.DataFrame:
"""
获取期货分钟K线数据过滤未来数据确保夜盘完整
Args:
symbol: 合约代码 "SN2504"
period: 分钟周期"5", "15", "30", "60"
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 使用akshare获取分钟数据
df = ak.futures_zh_minute_sina(symbol=symbol, period=period)
# 重命名列
df = df.rename(columns={
'day': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤未来数据
df = filter_future_data(df, current_time)
# 尝试补充夜盘数据
df = extend_night_session_data(df, symbol, period)
# 确保至少50根K线
if len(df) < 50:
print(f" 警告: {period}分钟只获取到{len(df)}根K线建议检查数据源")
return df
except Exception as e:
print(f" 获取{period}分钟数据失败: {e}")
return pd.DataFrame()
def get_daily_data(symbol: str, days: int = 60) -> pd.DataFrame:
"""
获取期货日K线数据过滤未来数据
Args:
symbol: 合约代码
days: 获取天数
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 获取日K数据获取较多历史数据
df = ak.futures_zh_daily_sina(symbol=symbol)
# 重命名列
df = df.rename(columns={
'date': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 排序
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 过滤未来数据(只保留今天及之前的数据)
df = filter_future_data(df, current_time)
# 取最近N天
df = df.tail(days).reset_index(drop=True)
return df
except Exception as e:
print(f" 获取日K数据失败: {e}")
return pd.DataFrame()
def get_stock_minute_data(symbol: str, period: str) -> pd.DataFrame:
"""
获取股票分钟K线数据
Args:
symbol: 股票代码 "000001"
period: 分钟周期"5", "15", "30", "60"
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 使用akshare获取股票分钟数据
# stock_zh_a_minute 需要 symbol 格式为 sh600000 或 sz000001
if symbol.startswith('6'):
full_symbol = f"sh{symbol}"
else:
full_symbol = f"sz{symbol}"
df = ak.stock_zh_a_minute(symbol=full_symbol, period=period)
# 重命名列
df = df.rename(columns={
'day': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤未来数据
df = filter_future_data(df, current_time)
# 确保至少50根K线
if len(df) < 50:
print(f" 警告: {period}分钟只获取到{len(df)}根K线建议检查数据源")
return df
except Exception as e:
print(f" 获取{period}分钟数据失败: {e}")
return pd.DataFrame()
def get_stock_daily_data(symbol: str, days: int = 60) -> pd.DataFrame:
"""
获取股票日K线数据
Args:
symbol: 股票代码 "000001"
days: 获取天数
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 计算开始日期(获取足够的历史数据)
end_date = current_time.strftime('%Y%m%d')
start_date = (current_time - timedelta(days=days*2)).strftime('%Y%m%d')
# 获取日K数据
df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date)
# 重命名列
df = df.rename(columns={
'日期': 'datetime',
'开盘': 'open',
'最高': 'high',
'最低': 'low',
'收盘': 'close',
'成交量': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 排序
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 过滤未来数据(只保留今天及之前的数据)
df = filter_future_data(df, current_time)
# 取最近N天
df = df.tail(days).reset_index(drop=True)
return df
except Exception as e:
print(f" 获取日K数据失败: {e}")
return pd.DataFrame()
def process_data(df: pd.DataFrame, timeframe: str) -> List[Dict]:
"""
处理数据计算指标并格式化输出
每个周期返回一个数组每项包含交易数据+计算指标
Args:
df: K线DataFrame
timeframe: 周期名称
Returns:
格式化后的K线数组每项包含指标
"""
if df.empty or len(df) < 10:
return []
# 计算技术指标
df = calculate_ma(df)
df = calculate_macd(df)
# 格式化K线数据取最近50根或全部
candles = []
df_tail = df.tail(50) if len(df) > 50 else df
for _, row in df_tail.iterrows():
candle = {
"time": str(row['datetime']),
"open": round(float(row['open']), 2),
"high": round(float(row['high']), 2),
"low": round(float(row['low']), 2),
"close": round(float(row['close']), 2),
"volume": int(row['volume']) if not pd.isna(row['volume']) else 0,
"ma10": round(float(row['MA10']), 2) if not pd.isna(row.get('MA10')) else None,
"ma20": round(float(row['MA20']), 2) if not pd.isna(row.get('MA20')) else None,
"macd_dif": round(float(row['macd_dif']), 4) if not pd.isna(row.get('macd_dif')) else 0,
"macd_dea": round(float(row['macd_dea']), 4) if not pd.isna(row.get('macd_dea')) else 0,
"macd_histogram": round(float(row['macd_histogram']), 4) if not pd.isna(row.get('macd_histogram')) else 0
}
candles.append(candle)
return candles
def collect_futures_data(symbol: str) -> Dict:
"""
收集期货多周期完整数据
Args:
symbol: 合约代码 "SN2504"
Returns:
完整的JSON格式数据
"""
print(f"\n正在获取期货 {symbol} 的多周期数据...")
print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 50)
result = {
"symbol": symbol,
"type": "futures",
"current_price": None,
"timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"),
"timeframes": {}
}
# 获取各周期数据
periods = [
("60min", "60"),
("30min", "30"),
("15min", "15"),
("5min", "5")
]
for tf_name, tf_period in periods:
print(f"获取 {tf_name} 数据...")
try:
df = get_minute_data(symbol, tf_period)
if not df.empty and len(df) >= 50:
candles = process_data(df, tf_name)
if candles:
result["timeframes"][tf_name] = candles
# 设置当前价格为最新收盘价
if result["current_price"] is None:
result["current_price"] = candles[-1]["close"]
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
# 获取日K数据
print("获取 daily 数据...")
try:
df_daily = get_daily_data(symbol, days=60)
if not df_daily.empty and len(df_daily) >= 50:
candles = process_data(df_daily, "daily")
if candles:
result["timeframes"]["daily"] = candles
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
print("-" * 50)
return result
def collect_stock_data(symbol: str) -> Dict:
"""
收集股票多周期完整数据
Args:
symbol: 股票代码 "000001"
Returns:
完整的JSON格式数据
"""
print(f"\n正在获取股票 {symbol} 的多周期数据...")
print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 50)
result = {
"symbol": symbol,
"type": "stock",
"current_price": None,
"timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"),
"timeframes": {}
}
# 获取各周期数据
periods = [
("60min", "60"),
("30min", "30"),
("15min", "15"),
("5min", "5")
]
for tf_name, tf_period in periods:
print(f"获取 {tf_name} 数据...")
try:
df = get_stock_minute_data(symbol, tf_period)
if not df.empty and len(df) >= 50:
candles = process_data(df, tf_name)
if candles:
result["timeframes"][tf_name] = candles
# 设置当前价格为最新收盘价
if result["current_price"] is None:
result["current_price"] = candles[-1]["close"]
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
# 获取日K数据
print("获取 daily 数据...")
try:
df_daily = get_stock_daily_data(symbol, days=60)
if not df_daily.empty and len(df_daily) >= 50:
candles = process_data(df_daily, "daily")
if candles:
result["timeframes"]["daily"] = candles
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
print("-" * 50)
return result
def main():
parser = argparse.ArgumentParser(description='期货/股票多周期数据获取与技术指标计算')
parser.add_argument('--symbol', type=str, required=True,
help='代码,期货如 SN2504(沪锡), 股票如 000001(平安银行)')
parser.add_argument('--type', type=str, default='futures', choices=['futures', 'stock'],
help='数据类型futures(期货)、stock(股票),默认为 futures')
parser.add_argument('--output', type=str, default=None,
help='输出JSON文件名默认为 代码_时间戳.json')
args = parser.parse_args()
# 根据类型获取数据
if args.type == 'stock':
data = collect_stock_data(args.symbol)
else:
data = collect_futures_data(args.symbol)
# 检查是否获取到数据
if not data["timeframes"]:
print("\n错误: 未能获取到任何数据,请检查代码是否正确")
if args.type == 'stock':
print("常见股票代码示例:")
print(" 000001 - 平安银行")
print(" 600000 - 浦发银行")
print(" 000858 - 五粮液")
print(" 600519 - 贵州茅台")
else:
print("常见期货合约代码示例:")
print(" SN2504 - 沪锡2504")
print(" AG2506 - 沪银2506")
print(" LC2505 - 碳酸锂2505")
print(" NI2505 - 沪镍2505")
return
# 打印JSON到控制台
print("\n" + "="*60)
print("JSON 输出:")
print("="*60)
json_output = json.dumps(data, ensure_ascii=False, indent=2)
print(json_output)
# 保存到文件(统一放到 data 目录)
if args.output:
# 如果用户指定了文件名,也放到 data 目录下
filename = os.path.join(DATA_DIR, args.output)
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(DATA_DIR, f"{data['symbol']}_{timestamp}.json")
with open(filename, 'w', encoding='utf-8') as f:
f.write(json_output)
print("\n" + "="*60)
print(f"[OK] 数据已保存到: {filename}")
print(f"[OK] 共获取 {len(data['timeframes'])} 个周期数据")
print("="*60)
if __name__ == "__main__":
main()

@ -0,0 +1,681 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
================================================================================
期货/股票多周期数据获取与技术指标计算脚本
================================================================================
功能介绍
本脚本用于获取期货或股票的多周期K线数据并自动计算技术指标输出JSON格式
可直接用于技术分析Skill的输入
核心功能
1. 多周期数据获取
- 5分钟K线至少50根
- 15分钟K线至少50根
- 30分钟K线至少50根
- 60分钟K线至少50根
- 日K线至少50根
2. 技术指标自动计算每根K线都包含
- MA10: 10/周期移动平均线
- MA20: 20/周期移动平均线
- MACD_DIF: MACD快线
- MACD_DEA: MACD慢线信号线
- MACD_HISTOGRAM: MACD柱状图
3. 输出格式
每个周期为一个数组数组中每项包含
{
"time": "K线时间",
"open": 开盘价,
"high": 最高价,
"low": 最低价,
"close": 收盘价,
"volume": 成交量,
"ma10": MA10值,
"ma20": MA20值,
"macd_dif": DIF值,
"macd_dea": DEA值,
"macd_histogram": 柱状图值
}
使用方法
1. 安装依赖
pip install akshare pandas
2. 运行脚本
# 获取期货数据(默认)
python futures_data_collector.py --symbol SN2504 --type futures
# 获取股票数据
python futures_data_collector.py --symbol 000001 --type stock
python futures_data_collector.py --symbol 600000 --type stock --output stock_data.json
3. 参数说明
--symbol: 代码必填
期货格式品种代码 + 年份 + 月份 SN2504(沪锡)AG2506(沪银)
股票格式6位数字代码 000001(平安银行)600000(浦发银行)
--type: 数据类型可选默认 futures
可选值futures(期货)stock(股票)
--output: 输出文件名可选
默认格式{代码}_{时间戳}.json
常见代码对照
期货
SN2504 - 沪锡上海期货交易所
AG2506 - 沪银上海期货交易所
LC2505 - 碳酸锂广州期货交易所
NI2505 - 沪镍上海期货交易所
股票
000001 - 平安银行
600000 - 浦发银行
000858 - 五粮液
600519 - 贵州茅台
输出示例
{
"symbol": "SN2504",
"current_price": 223500,
"timestamp": "2026-03-07T22:15:00+08:00",
"timeframes": {
"60min": [
{
"time": "2026-03-07 14:00",
"open": 22100,
"high": 22300,
"low": 22050,
"close": 22250,
"volume": 12500,
"ma10": 22180.5,
"ma20": 22000.3,
"macd_dif": 0.0523,
"macd_dea": 0.0312,
"macd_histogram": 0.0422
}
],
"30min": [ ... ],
"15min": [ ... ],
"5min": [ ... ],
"daily": [ ... ]
}
}
注意事项
1. 数据源使用akshare数据可能有延迟或频率限制
2. 分钟数据受交易所限制可能无法获取太多历史数据
3. 如遇数据获取失败请检查合约代码是否正确
4. 脚本会自动过滤掉数据不足的周期
作者OpenClaw Assistant
日期2026-03-07
================================================================================
"""
import akshare as ak
import pandas as pd
import json
import argparse
import os
from datetime import datetime, timedelta
from typing import Dict, List
import warnings
warnings.filterwarnings('ignore')
# 清除缓存
ak.cache = {}
# 数据目录配置
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
os.makedirs(DATA_DIR, exist_ok=True)
def calculate_ma(df: pd.DataFrame, periods: List[int] = [10, 20]) -> pd.DataFrame:
"""计算移动平均线"""
for period in periods:
df[f'MA{period}'] = df['close'].rolling(window=period, min_periods=1).mean()
return df
def calculate_macd(df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame:
"""计算MACD指标"""
ema_fast = df['close'].ewm(span=fast, adjust=False).mean()
ema_slow = df['close'].ewm(span=slow, adjust=False).mean()
df['macd_dif'] = ema_fast - ema_slow
df['macd_dea'] = df['macd_dif'].ewm(span=signal, adjust=False).mean()
df['macd_histogram'] = (df['macd_dif'] - df['macd_dea']) * 2
# MACD信号判断
df['macd_signal'] = df.apply(lambda row:
'bullish' if row['macd_dif'] > row['macd_dea'] and row['macd_histogram'] > 0
else 'bearish' if row['macd_dif'] < row['macd_dea'] and row['macd_histogram'] < 0
else 'neutral', axis=1)
return df
def get_current_time() -> datetime:
"""获取当前北京时间(去除微秒)"""
return datetime.now().replace(microsecond=0)
def filter_future_data(df: pd.DataFrame, current_time: datetime = None) -> pd.DataFrame:
"""
过滤掉未来数据
Args:
df: 包含datetime列的DataFrame
current_time: 当前时间默认为系统当前时间
Returns:
过滤后的DataFrame
"""
if current_time is None:
current_time = get_current_time()
if 'datetime' not in df.columns:
return df
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤掉大于当前时间的数据(未来数据)
original_count = len(df)
df = df[df['datetime'] <= current_time].copy()
filtered_count = original_count - len(df)
if filtered_count > 0:
print(f" 过滤了 {filtered_count} 条未来数据")
return df
def extend_night_session_data(df: pd.DataFrame, symbol: str, period: str) -> pd.DataFrame:
"""
尝试获取完整的夜盘数据
期货夜盘时间21:00 - 02:30次日
"""
if df.empty or 'datetime' not in df.columns:
return df
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 检查是否需要补充夜盘数据
# 获取最后一条数据的时间
last_time = df['datetime'].iloc[-1]
last_hour = last_time.hour
last_minute = last_time.minute
# 如果最后数据在 21:00-23:59 或 00:00-02:30 范围内,可能需要补充
# 夜盘结束时间是 02:30
is_night_session = (
(last_hour >= 21) or # 21:00 - 23:59
(last_hour < 2) or # 00:00 - 01:59
(last_hour == 2 and last_minute <= 30) # 02:00 - 02:30
)
if not is_night_session:
return df
# 检查是否包含 02:30 的数据对于5分钟、15分钟等周期
has_0230 = False
for dt in df['datetime']:
if dt.hour == 2 and dt.minute == 30:
has_0230 = True
break
# 如果已经有 02:30 的数据,说明夜盘完整
if has_0230:
return df
# 尝试通过获取历史数据来补充夜盘
# 由于akshare限制我们记录警告信息
print(f" 注意: 夜盘数据可能不完整缺少02:30及之前的数据")
return df
def get_minute_data(symbol: str, period: str) -> pd.DataFrame:
"""
获取期货分钟K线数据过滤未来数据确保夜盘完整
Args:
symbol: 合约代码 "SN2504"
period: 分钟周期"5", "15", "30", "60"
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 使用akshare获取分钟数据
df = ak.futures_zh_minute_sina(symbol=symbol, period=period)
# 重命名列
df = df.rename(columns={
'day': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤未来数据
df = filter_future_data(df, current_time)
# 尝试补充夜盘数据
df = extend_night_session_data(df, symbol, period)
# 确保至少50根K线
if len(df) < 50:
print(f" 警告: {period}分钟只获取到{len(df)}根K线建议检查数据源")
return df
except Exception as e:
print(f" 获取{period}分钟数据失败: {e}")
return pd.DataFrame()
def get_daily_data(symbol: str, days: int = 60) -> pd.DataFrame:
"""
获取期货日K线数据过滤未来数据
Args:
symbol: 合约代码
days: 获取天数
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 获取日K数据获取较多历史数据
df = ak.futures_zh_daily_sina(symbol=symbol)
# 重命名列
df = df.rename(columns={
'date': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 排序
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 过滤未来数据(只保留今天及之前的数据)
df = filter_future_data(df, current_time)
# 取最近N天
df = df.tail(days).reset_index(drop=True)
return df
except Exception as e:
print(f" 获取日K数据失败: {e}")
return pd.DataFrame()
def get_stock_minute_data(symbol: str, period: str) -> pd.DataFrame:
"""
获取股票分钟K线数据
Args:
symbol: 股票代码 "000001"
period: 分钟周期"5", "15", "30", "60"
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 使用akshare获取股票分钟数据
# stock_zh_a_minute 需要 symbol 格式为 sh600000 或 sz000001
if symbol.startswith('6'):
full_symbol = f"sh{symbol}"
else:
full_symbol = f"sz{symbol}"
df = ak.stock_zh_a_minute(symbol=full_symbol, period=period)
# 重命名列
df = df.rename(columns={
'day': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤未来数据
df = filter_future_data(df, current_time)
# 确保至少50根K线
if len(df) < 50:
print(f" 警告: {period}分钟只获取到{len(df)}根K线建议检查数据源")
return df
except Exception as e:
print(f" 获取{period}分钟数据失败: {e}")
return pd.DataFrame()
def get_stock_daily_data(symbol: str, days: int = 60) -> pd.DataFrame:
"""
获取股票日K线数据
Args:
symbol: 股票代码 "000001"
days: 获取天数
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 计算开始日期(获取足够的历史数据)
end_date = current_time.strftime('%Y%m%d')
start_date = (current_time - timedelta(days=days*2)).strftime('%Y%m%d')
# 获取日K数据
df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date)
# 重命名列
df = df.rename(columns={
'日期': 'datetime',
'开盘': 'open',
'最高': 'high',
'最低': 'low',
'收盘': 'close',
'成交量': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 排序
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 过滤未来数据(只保留今天及之前的数据)
df = filter_future_data(df, current_time)
# 取最近N天
df = df.tail(days).reset_index(drop=True)
return df
except Exception as e:
print(f" 获取日K数据失败: {e}")
return pd.DataFrame()
def process_data(df: pd.DataFrame, timeframe: str) -> List[Dict]:
"""
处理数据计算指标并格式化输出
每个周期返回一个数组每项包含交易数据+计算指标
Args:
df: K线DataFrame
timeframe: 周期名称
Returns:
格式化后的K线数组每项包含指标
"""
if df.empty or len(df) < 10:
return []
# 计算技术指标
df = calculate_ma(df)
df = calculate_macd(df)
# 格式化K线数据取最近50根或全部
candles = []
df_tail = df.tail(50) if len(df) > 50 else df
for _, row in df_tail.iterrows():
candle = {
"time": str(row['datetime']),
"open": round(float(row['open']), 2),
"high": round(float(row['high']), 2),
"low": round(float(row['low']), 2),
"close": round(float(row['close']), 2),
"volume": int(row['volume']) if not pd.isna(row['volume']) else 0,
"ma10": round(float(row['MA10']), 2) if not pd.isna(row.get('MA10')) else None,
"ma20": round(float(row['MA20']), 2) if not pd.isna(row.get('MA20')) else None,
"macd_dif": round(float(row['macd_dif']), 4) if not pd.isna(row.get('macd_dif')) else 0,
"macd_dea": round(float(row['macd_dea']), 4) if not pd.isna(row.get('macd_dea')) else 0,
"macd_histogram": round(float(row['macd_histogram']), 4) if not pd.isna(row.get('macd_histogram')) else 0
}
candles.append(candle)
return candles
def collect_futures_data(symbol: str) -> Dict:
"""
收集期货多周期完整数据
Args:
symbol: 合约代码 "SN2504"
Returns:
完整的JSON格式数据
"""
print(f"\n正在获取期货 {symbol} 的多周期数据...")
print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 50)
result = {
"symbol": symbol,
"type": "futures",
"current_price": None,
"timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"),
"timeframes": {}
}
# 获取各周期数据
periods = [
("60min", "60"),
("30min", "30"),
("15min", "15"),
("5min", "5")
]
for tf_name, tf_period in periods:
print(f"获取 {tf_name} 数据...")
try:
df = get_minute_data(symbol, tf_period)
if not df.empty and len(df) >= 50:
candles = process_data(df, tf_name)
if candles:
result["timeframes"][tf_name] = candles
# 设置当前价格为最新收盘价
if result["current_price"] is None:
result["current_price"] = candles[-1]["close"]
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
# 获取日K数据
print("获取 daily 数据...")
try:
df_daily = get_daily_data(symbol, days=60)
if not df_daily.empty and len(df_daily) >= 50:
candles = process_data(df_daily, "daily")
if candles:
result["timeframes"]["daily"] = candles
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
print("-" * 50)
return result
def collect_stock_data(symbol: str) -> Dict:
"""
收集股票多周期完整数据
Args:
symbol: 股票代码 "000001"
Returns:
完整的JSON格式数据
"""
print(f"\n正在获取股票 {symbol} 的多周期数据...")
print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 50)
result = {
"symbol": symbol,
"type": "stock",
"current_price": None,
"timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"),
"timeframes": {}
}
# 获取各周期数据
periods = [
("60min", "60"),
("30min", "30"),
("15min", "15"),
("5min", "5")
]
for tf_name, tf_period in periods:
print(f"获取 {tf_name} 数据...")
try:
df = get_stock_minute_data(symbol, tf_period)
if not df.empty and len(df) >= 50:
candles = process_data(df, tf_name)
if candles:
result["timeframes"][tf_name] = candles
# 设置当前价格为最新收盘价
if result["current_price"] is None:
result["current_price"] = candles[-1]["close"]
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
# 获取日K数据
print("获取 daily 数据...")
try:
df_daily = get_stock_daily_data(symbol, days=60)
if not df_daily.empty and len(df_daily) >= 50:
candles = process_data(df_daily, "daily")
if candles:
result["timeframes"]["daily"] = candles
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
print("-" * 50)
return result
def main():
parser = argparse.ArgumentParser(description='期货/股票多周期数据获取与技术指标计算')
parser.add_argument('--symbol', type=str, required=True,
help='代码,期货如 SN2504(沪锡), 股票如 000001(平安银行)')
parser.add_argument('--type', type=str, default='futures', choices=['futures', 'stock'],
help='数据类型futures(期货)、stock(股票),默认为 futures')
parser.add_argument('--output', type=str, default=None,
help='输出JSON文件名默认为 代码_时间戳.json')
args = parser.parse_args()
# 根据类型获取数据
if args.type == 'stock':
data = collect_stock_data(args.symbol)
else:
data = collect_futures_data(args.symbol)
# 检查是否获取到数据
if not data["timeframes"]:
print("\n错误: 未能获取到任何数据,请检查代码是否正确")
if args.type == 'stock':
print("常见股票代码示例:")
print(" 000001 - 平安银行")
print(" 600000 - 浦发银行")
print(" 000858 - 五粮液")
print(" 600519 - 贵州茅台")
else:
print("常见期货合约代码示例:")
print(" SN2504 - 沪锡2504")
print(" AG2506 - 沪银2506")
print(" LC2505 - 碳酸锂2505")
print(" NI2505 - 沪镍2505")
return
# 打印JSON到控制台
print("\n" + "="*60)
print("JSON 输出:")
print("="*60)
json_output = json.dumps(data, ensure_ascii=False, indent=2)
print(json_output)
# 保存到文件(统一放到 data 目录)
if args.output:
# 如果用户指定了文件名,也放到 data 目录下
filename = os.path.join(DATA_DIR, args.output)
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(DATA_DIR, f"{data['symbol']}_{timestamp}.json")
with open(filename, 'w', encoding='utf-8') as f:
f.write(json_output)
print("\n" + "="*60)
print(f"[OK] 数据已保存到: {filename}")
print(f"[OK] 共获取 {len(data['timeframes'])} 个周期数据")
print("="*60)
if __name__ == "__main__":
main()

@ -0,0 +1,748 @@
"""
AmazingData 数据源适配器 - 完整测试套件
每个接口都有独立的测试用例和入口
"""
import sys
import traceback
from datetime import datetime
from amazing_data_adapter import (
AmazingDataAdapter,
DataSourceConfig,
SecurityType,
Market,
Period,
create_adapter
)
# ============== 配置信息 ==============
USERNAME = '11200008169'
PASSWORD = '11200008169@2026'
HOST = '140.206.44.234'
PORT = 8600
LOCAL_PATH = './amazing_data_cache/'
def create_test_adapter() -> AmazingDataAdapter:
"""创建测试用的适配器实例"""
return create_adapter(
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
local_path=LOCAL_PATH,
use_local_cache=True
)
def run_test(test_name: str, test_func):
"""运行单个测试并输出结果"""
print(f"\n{'='*60}")
print(f"测试: {test_name}")
print('='*60)
adapter = create_test_adapter()
try:
if not adapter.connect():
print("❌ 连接失败")
return False
test_func(adapter)
print("✅ 测试通过")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
traceback.print_exc()
return False
finally:
adapter.disconnect()
# ============== 基础数据接口测试 ==============
def test_get_code_list(adapter: AmazingDataAdapter):
"""测试: 获取代码列表"""
print("\n--- 测试获取各类证券代码列表 ---")
# 沪深A股
stock_codes = adapter.get_code_list(SecurityType.STOCK_A)
print(f"沪深A股数量: {len(stock_codes)}")
if stock_codes:
print(f"示例代码: {stock_codes[:5]}")
# ETF
etf_codes = adapter.get_code_list(SecurityType.ETF)
print(f"ETF数量: {len(etf_codes)}")
if etf_codes:
print(f"示例代码: {etf_codes[:3]}")
# 期货
future_codes = adapter.get_code_list(SecurityType.FUTURE)
print(f"期货数量: {len(future_codes)}")
if future_codes:
print(f"示例代码: {future_codes[:3]}")
# 可转债
kzz_codes = adapter.get_code_list(SecurityType.KZZ)
print(f"可转债数量: {len(kzz_codes)}")
if kzz_codes:
print(f"示例代码: {kzz_codes[:3]}")
# 指数
index_codes = adapter.get_code_list(SecurityType.INDEX_A)
print(f"沪深指数数量: {len(index_codes)}")
if index_codes:
print(f"示例代码: {index_codes[:3]}")
def test_get_code_info(adapter: AmazingDataAdapter):
"""测试: 获取证券信息"""
print("\n--- 测试获取证券基本信息 ---")
stock_info = adapter.get_code_info(SecurityType.STOCK_A)
print(f"证券信息形状: {stock_info.shape}")
print(f"列名: {stock_info.columns.tolist()[:10]}")
print("\n前3条数据:")
print(stock_info.head(3))
def test_get_trading_calendar(adapter: AmazingDataAdapter):
"""测试: 获取交易日历"""
print("\n--- 测试获取交易日历 ---")
# 上海市场
sh_calendar = adapter.get_trading_calendar(Market.SH)
print(f"上海市场交易日数量: {len(sh_calendar)}")
print(f"最近5个交易日: {sh_calendar[-5:]}")
# 深圳市场
sz_calendar = adapter.get_trading_calendar(Market.SZ)
print(f"深圳市场交易日数量: {len(sz_calendar)}")
# 北京市场
bj_calendar = adapter.get_trading_calendar(Market.BJ)
print(f"北京市场交易日数量: {len(bj_calendar)}")
def test_get_adj_factor(adapter: AmazingDataAdapter):
"""测试: 获取复权因子"""
print("\n--- 测试获取复权因子 ---")
codes = ['000001.SZ', '600000.SH']
adj_factor = adapter.get_adj_factor(codes, is_local=False)
print(f"复权因子形状: {adj_factor.shape}")
print(f"列名: {adj_factor.columns.tolist()}")
print("\n前5条数据:")
print(adj_factor.head())
def test_get_backward_factor(adapter: AmazingDataAdapter):
"""测试: 获取后复权因子"""
print("\n--- 测试获取后复权因子 ---")
codes = ['000001.SZ', '600000.SH', '000858.SZ']
backward_factor = adapter.get_backward_factor(codes)
print(f"后复权因子形状: {backward_factor.shape}")
print(f"列名: {backward_factor.columns.tolist()}")
print("\n前5条数据:")
print(backward_factor.head())
# ============== 历史行情数据接口测试 ==============
def test_get_kline(adapter: AmazingDataAdapter):
"""测试: 获取历史K线数据"""
print("\n--- 测试获取历史K线数据 ---")
# 获取沪深A股代码列表
stock_codes = adapter.get_code_list(SecurityType.STOCK_A)
sample_codes = stock_codes[:3]
# 日K线
print("\n>>> 日K线数据")
kline_daily = adapter.get_kline(
codes=sample_codes,
start_date='20241201',
end_date='20241231',
period=Period.DAILY
)
for code, df in list(kline_daily.items())[:2]:
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
print(f" 列名: {df.columns.tolist()}")
if not df.empty:
print(f" 最新数据:\n{df.tail(2)}")
# 分钟K线
print("\n>>> 60分钟K线数据")
kline_min = adapter.get_kline(
codes=['000001.SZ'],
start_date='20241201',
end_date='20241231',
period=Period.MIN60
)
for code, df in kline_min.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 前2条数据:\n{df.head(2)}")
def test_get_snapshot(adapter: AmazingDataAdapter):
"""测试: 获取历史快照数据tick级别"""
print("\n--- 测试获取历史快照数据 ---")
snapshot_data = adapter.get_snapshot(
codes=['000001.SZ'],
start_date='20241201',
end_date='20241201' # 只取一天的数据,避免数据量过大
)
print(f"快照数据形状: {snapshot_data}")
for code, df in snapshot_data.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:15]}") # 显示前15列
print(f" 前3条数据:\n{df.head(3)}")
else:
print(" 数据为空")
# ============== 财务数据接口测试 ==============
def test_get_balance_sheet(adapter: AmazingDataAdapter):
"""测试: 获取资产负债表"""
print("\n--- 测试获取资产负债表 ---")
codes = ['000001.SZ', '600000.SH']
balance_sheet = adapter.get_balance_sheet(
codes=codes,
start_date=20250101,
end_date=20260311
)
for code, df in balance_sheet.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
# 获取最后一个资产负债表并打印最近日期的 TOT_SHARE
print("\n--- 最后一个资产负债表详情 ---")
last_code = list(balance_sheet.keys())[-1]
last_df = balance_sheet[last_code]
if not last_df.empty:
print(f"股票代码: {last_code}")
print(f"资产负债表:\n{last_df}")
if 'TOT_SHARE' in last_df.columns:
# 按 REPORTING_PERIOD 降序排序,获取最近日期的一行
if 'REPORTING_PERIOD' in last_df.columns:
# 输出日期和总股本对应关系
print("\n日期与总股本对应关系:")
for idx, row in last_df.iterrows():
print(f" {row['REPORTING_PERIOD']}: {row['TOT_SHARE']}")
# 获取最近日期的数据
latest_row = last_df.sort_values('REPORTING_PERIOD', ascending=False).iloc[0]
latest_date = latest_row['REPORTING_PERIOD']
else:
# 如果没有 REPORTING_PERIOD 列,取最后一行
print("\n日期与总股本对应关系:")
for idx, row in last_df.iterrows():
print(f" {idx}: {row['TOT_SHARE']}")
latest_row = last_df.iloc[-1]
latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知'
latest_tot_share = latest_row['TOT_SHARE']
print(f"\n>>> 最近报告期 {latest_date} 的总股本: {latest_tot_share}")
else:
print(f"\n可用列: {last_df.columns.tolist()}")
print("未找到 TOT_SHARE 列")
else:
print(f"{last_code} 的资产负债表为空")
def test_get_cash_flow(adapter: AmazingDataAdapter):
"""测试: 获取现金流量表"""
print("\n--- 测试获取现金流量表 ---")
codes = ['000001.SZ', '600000.SH']
cash_flow = adapter.get_cash_flow(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in cash_flow.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
def test_get_income_statement(adapter: AmazingDataAdapter):
"""测试: 获取利润表"""
print("\n--- 测试获取利润表 ---")
codes = ['000001.SZ', '600000.SH']
income = adapter.get_income_statement(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in income.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名(前10): {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
def test_get_profit_express(adapter: AmazingDataAdapter):
"""测试: 获取业绩快报"""
print("\n--- 测试获取业绩快报 ---")
codes = ['000001.SZ', '600000.SH', '000858.SZ']
profit_express = adapter.get_profit_express(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {profit_express.shape}")
if not profit_express.empty:
print(f"列名: {profit_express.columns.tolist()}")
print(f"\n前5条数据:\n{profit_express.head()}")
else:
print("数据为空")
def test_get_profit_notice(adapter: AmazingDataAdapter):
"""测试: 获取业绩预告"""
print("\n--- 测试获取业绩预告 ---")
codes = ['000001.SZ', '600000.SH']
profit_notice = adapter.get_profit_notice(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {profit_notice.shape}")
if not profit_notice.empty:
print(f"列名: {profit_notice.columns.tolist()}")
print(f"\n前5条数据:\n{profit_notice.head()}")
else:
print("数据为空")
# ============== 股东股本数据接口测试 ==============
def test_get_top10_shareholders(adapter: AmazingDataAdapter):
"""测试: 获取十大股东数据"""
print("\n--- 测试获取十大股东数据 ---")
codes = ['000001.SZ', '600000.SH']
top10 = adapter.get_top10_shareholders(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {top10.shape}")
if not top10.empty:
print(f"列名: {top10.columns.tolist()}")
print(f"\n前5条数据:\n{top10.head()}")
else:
print("数据为空")
def test_get_shareholder_count(adapter: AmazingDataAdapter):
"""测试: 获取股东户数数据"""
print("\n--- 测试获取股东户数数据 ---")
codes = ['000001.SZ', '600000.SH']
holder_count = adapter.get_shareholder_count(
codes=codes,
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {holder_count.shape}")
if not holder_count.empty:
print(f"列名: {holder_count.columns.tolist()}")
print(f"\n前5条数据:\n{holder_count.head()}")
else:
print("数据为空")
def test_get_equity_structure(adapter: AmazingDataAdapter):
"""测试: 获取股本结构数据"""
print("\n--- 测试获取股本结构数据 ---")
codes = ['600000.SH']
equity = adapter.get_equity_structure(
codes=codes,
start_date=20250101,
end_date=20251231
)
print(f"数据形状: {equity.shape}")
if not equity.empty:
print(f"列名: {equity.columns.tolist()}")
print(f"\n前5条数据:\n{equity.head()}")
# 按日期和 TOT_A_SHARE 输出
if 'TOT_A_SHARE' in equity.columns:
print("\n日期与流通A股(TOT_A_SHARE)对应关系:")
if 'REPORTING_PERIOD' in equity.columns:
for idx, row in equity.iterrows():
print(f" {row['REPORTING_PERIOD']}: {row['TOT_A_SHARE']}")
# 获取最近日期的数据
latest_row = equity.sort_values('REPORTING_PERIOD', ascending=False).iloc[0]
latest_date = latest_row['REPORTING_PERIOD']
else:
for idx, row in equity.iterrows():
print(f" {idx}: {row['TOT_A_SHARE']}")
latest_row = equity.iloc[-1]
latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知'
print(f"\n>>> 最近报告期 {latest_date} 的流通A股: {latest_row['TOT_A_SHARE']}")
else:
print(f"\n可用列: {equity.columns.tolist()}")
print("未找到 TOT_A_SHARE 列")
else:
print("数据为空")
# ============== 融资融券数据接口测试 ==============
def test_get_margin_summary(adapter: AmazingDataAdapter):
"""测试: 获取融资融券成交汇总"""
print("\n--- 测试获取融资融券成交汇总 ---")
margin_summary = adapter.get_margin_summary(
start_date=20240101,
end_date=20241231
)
print(f"数据形状: {margin_summary.shape}")
if not margin_summary.empty:
print(f"列名: {margin_summary.columns.tolist()}")
print(f"\n前5条数据:\n{margin_summary.head()}")
else:
print("数据为空")
def test_get_margin_detail(adapter: AmazingDataAdapter):
"""测试: 获取融资融券交易明细"""
print("\n--- 测试获取融资融券交易明细 ---")
codes = ['000001.SZ', '600000.SH']
margin_detail = adapter.get_margin_detail(
codes=codes,
start_date=20241201,
end_date=20241231
)
for code, df in margin_detail.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head(2)}")
# ============== 交易异动数据接口测试 ==============
def test_get_longhu_bang(adapter: AmazingDataAdapter):
"""测试: 获取龙虎榜数据"""
print("\n--- 测试获取龙虎榜数据 ---")
codes = ['000001.SZ', '600000.SH']
longhu = adapter.get_longhu_bang(
codes=codes,
start_date=20241201,
end_date=20241231
)
print(f"数据形状: {longhu.shape}")
if not longhu.empty:
print(f"列名: {longhu.columns.tolist()}")
print(f"\n前5条数据:\n{longhu.head()}")
else:
print("数据为空")
def test_get_block_trading(adapter: AmazingDataAdapter):
"""测试: 获取大宗交易数据"""
print("\n--- 测试获取大宗交易数据 ---")
codes = ['000001.SZ', '600000.SH']
block_trade = adapter.get_block_trading(
codes=codes,
start_date=20241201,
end_date=20241231
)
print(f"数据形状: {block_trade.shape}")
if not block_trade.empty:
print(f"列名: {block_trade.columns.tolist()}")
print(f"\n前5条数据:\n{block_trade.head()}")
else:
print("数据为空")
# ============== 指数数据接口测试 ==============
def test_get_index_constituents(adapter: AmazingDataAdapter):
"""测试: 获取指数成分股"""
print("\n--- 测试获取指数成分股 ---")
index_codes = ['000300.SH', '000905.SH', '000016.SH']
constituents = adapter.get_index_constituents(index_codes)
for code, df in constituents.items():
print(f"\n{code}:")
print(f" 成分股数量: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 前5只成分股:\n{df.head()}")
def test_get_index_weights(adapter: AmazingDataAdapter):
"""测试: 获取指数成分股权重"""
print("\n--- 测试获取指数成分股权重 ---")
index_codes = ['000300.SH', '000905.SH']
weights = adapter.get_index_weights(
codes=index_codes,
start_date=20241201,
end_date=20241231
)
for code, df in weights.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 权重最大的5只股票:\n{df.nlargest(5, 'WEIGHT') if 'WEIGHT' in df.columns else df.head()}")
# ============== ETF数据接口测试 ==============
def test_get_etf_pcf(adapter: AmazingDataAdapter):
"""测试: 获取ETF申赎数据"""
print("\n--- 测试获取ETF申赎数据 ---")
etf_codes = ['510050.SH', '510300.SH']
etf_info, etf_constituents = adapter.get_etf_pcf(etf_codes)
print(f"ETF基本信息形状: {etf_info.shape}")
if not etf_info.empty:
print(f"列名: {etf_info.columns.tolist()}")
print(f"\nETF基本信息:\n{etf_info.head()}")
print(f"\nETF成分股数据数量: {len(etf_constituents)}")
for code, df in list(etf_constituents.items())[:2]:
print(f"\n{code} 成分股:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 前3条:\n{df.head(3)}")
def test_get_fund_share(adapter: AmazingDataAdapter):
"""测试: 获取基金份额数据"""
print("\n--- 测试获取基金份额数据 ---")
codes = ['510050.SH', '510300.SH']
fund_share = adapter.get_fund_share(
codes=codes,
start_date=20240101,
end_date=20241231
)
for code, df in fund_share.items():
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()}")
print(f" 数据预览:\n{df.head()}")
# ============== 可转债数据接口测试 ==============
def test_get_kzz_issuance(adapter: AmazingDataAdapter):
"""测试: 获取可转债发行数据"""
print("\n--- 测试获取可转债发行数据 ---")
kzz_codes = adapter.get_code_list(SecurityType.KZZ)
print(f"可转债数量: {len(kzz_codes)}")
if kzz_codes:
sample_codes = kzz_codes[:5]
print(f"测试代码: {sample_codes}")
kzz_issuance = adapter.get_kzz_issuance(sample_codes)
for code, df in list(kzz_issuance.items())[:3]:
print(f"\n{code}:")
print(f" 数据条数: {len(df)}")
if not df.empty:
print(f" 列名: {df.columns.tolist()[:10]}")
print(f" 数据预览:\n{df.head()}")
# ============== 测试入口 ==============
# 所有测试用例映射
TEST_CASES = {
# 基础数据接口 (5个)
'code_list': test_get_code_list,
'code_info': test_get_code_info,
'trading_calendar': test_get_trading_calendar,
'adj_factor': test_get_adj_factor,
'backward_factor': test_get_backward_factor,
# 历史行情数据接口 (2个)
'kline': test_get_kline,
'snapshot': test_get_snapshot,
# 财务数据接口 (5个)
'balance_sheet': test_get_balance_sheet,
'cash_flow': test_get_cash_flow,
'income_statement': test_get_income_statement,
'profit_express': test_get_profit_express,
'profit_notice': test_get_profit_notice,
# 股东股本数据接口 (3个)
'top10_shareholders': test_get_top10_shareholders,
'shareholder_count': test_get_shareholder_count,
'equity_structure': test_get_equity_structure,
# 融资融券数据接口 (2个)
'margin_summary': test_get_margin_summary,
'margin_detail': test_get_margin_detail,
# 交易异动数据接口 (2个)
'longhu_bang': test_get_longhu_bang,
'block_trading': test_get_block_trading,
# 指数数据接口 (2个)
'index_constituents': test_get_index_constituents,
'index_weights': test_get_index_weights,
# ETF数据接口 (2个)
'etf_pcf': test_get_etf_pcf,
'fund_share': test_get_fund_share,
# 可转债数据接口 (1个)
'kzz_issuance': test_get_kzz_issuance,
}
def print_usage():
"""打印使用说明"""
print("""
使用方法:
python test_amazing_data_adapter.py <test_name>
测试用例列表:
基础数据接口:
code_list - 获取代码列表
code_info - 获取证券信息
trading_calendar - 获取交易日历
adj_factor - 获取复权因子
backward_factor - 获取后复权因子
历史行情数据接口:
kline - 获取历史K线数据
snapshot - 获取历史快照数据
财务数据接口:
balance_sheet - 获取资产负债表
cash_flow - 获取现金流量表
income_statement - 获取利润表
profit_express - 获取业绩快报
profit_notice - 获取业绩预告
股东股本数据接口:
top10_shareholders - 获取十大股东数据
shareholder_count - 获取股东户数数据
equity_structure - 获取股本结构数据
融资融券数据接口:
margin_summary - 获取融资融券成交汇总
margin_detail - 获取融资融券交易明细
交易异动数据接口:
longhu_bang - 获取龙虎榜数据
block_trading - 获取大宗交易数据
指数数据接口:
index_constituents - 获取指数成分股
index_weights - 获取指数成分股权重
ETF数据接口:
etf_pcf - 获取ETF申赎数据
fund_share - 获取基金份额数据
可转债数据接口:
kzz_issuance - 获取可转债发行数据
特殊命令:
all - 运行所有测试
list - 列出所有测试用例
""")
def run_all_tests():
"""运行所有测试"""
print("\n" + "="*60)
print(f"运行所有测试用例: {list(TEST_CASES.keys())}")
print("="*60)
results = {}
for name, func in TEST_CASES.items():
print(f"\n--- 测试: {name} ---")
results[name] = run_test(name, func)
print(f"\n--- 测试: {name} 完成 ---")
# 打印汇总
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
passed = sum(results.values())
total = len(results)
for name, result in results.items():
status = "✅ 通过" if result else "❌ 失败"
print(f"{name:20s} {status}")
print("-"*60)
print(f"总计: {passed}/{total} 通过")
if __name__ == "__main__":
if len(sys.argv) < 2:
print_usage()
sys.exit(1)
test_name = sys.argv[1].lower()
if test_name == 'list':
print_usage()
elif test_name == 'all':
run_all_tests()
elif test_name in TEST_CASES:
run_test(test_name, TEST_CASES[test_name])
else:
print(f"未知测试用例: {test_name}")
print_usage()
sys.exit(1)

Binary file not shown.
Loading…
Cancel
Save