You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

236 lines
6.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""数据源适配器基类 - 对应Go的adapter/adapter.go"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Optional
@dataclass
class TickData:
"""Tick数据"""
symbol: str
price: float
volume: int
time: int # Unix时间戳
@dataclass
class KLineData:
"""K线数据"""
symbol: str
time: int # Unix时间戳
open: float
high: float
low: float
close: float
volume: int
amount: float
open_interest: Optional[int] = None
# 股票特有字段
trade_date: Optional[str] = None # 交易日 (YYYY-MM-DD)
is_limit_up: Optional[bool] = None # 是否涨停
is_limit_down: Optional[bool] = None # 是否跌停
total_market_cap: Optional[float] = None # 总市值(元)
float_market_cap: Optional[float] = None # 流通市值(元)
inst_holding_ratio: Optional[float] = None # 机构持仓占比(%
trading_days: Optional[int] = None # 可交易日数(从上市至今)
@dataclass
class SymbolInfo:
"""标的信息"""
symbol_id: str
name: str
exchange: str
underlying: str = "" # 期货品种代码
contract_month: str = ""
list_date: str = ""
delist_date: str = ""
@dataclass
class TradeCalData:
"""交易日历数据"""
date: datetime
is_trading_day: bool
has_night_session: bool = False
# Tick数据回调类型
TickCallback = Callable[[str, TickData], None]
class DataSourceAdapter(ABC):
"""数据源适配器接口"""
@abstractmethod
async def connect(self, config: dict) -> None:
"""建立连接"""
pass
@abstractmethod
async def subscribe_ticks(self, symbols: List[str], callback: TickCallback) -> None:
"""订阅实时Tick"""
pass
@abstractmethod
async def fetch_klines(
self,
symbol: str,
start: str,
end: str,
freq: str
) -> List[KLineData]:
"""拉取历史K线"""
pass
@abstractmethod
async def fetch_symbols(self, asset_type: str) -> List[SymbolInfo]:
"""获取标的列表"""
pass
@abstractmethod
async def fetch_trading_calendar(
self,
exchange: str,
start: str,
end: str
) -> List[TradeCalData]:
"""获取交易日历"""
pass
@abstractmethod
async def health_check(self) -> bool:
"""健康检查"""
pass
@abstractmethod
async def close(self) -> None:
"""关闭连接"""
pass
# ==================== 拆分表结构的新接口 ====================
async def fetch_kline_base(
self,
symbol: str,
start: str,
end: str,
freq: str
) -> List[dict]:
"""获取K线基础数据 (OHLCV)
对应表: stock_klines_1d_base, stock_klines_1m_base 等
Returns:
List[dict] 包含字段:
- symbol: 标的代码
- ts: 时间戳
- trade_date: 交易日期
- open/high/low/close: 开高低收
- volume: 成交量
- amount: 成交额
"""
# 默认实现:使用 fetch_klines 并转换格式
klines = await self.fetch_klines(symbol, start, end, freq)
return [
{
"symbol": k.symbol,
"ts": k.time,
"trade_date": k.trade_date,
"open": k.open,
"high": k.high,
"low": k.low,
"close": k.close,
"volume": k.volume,
"amount": k.amount,
}
for k in klines
]
async def fetch_kline_quote(
self,
symbol: str,
start: str,
end: str
) -> List[dict]:
"""获取日线行情指标数据
对应表: stock_klines_1d_quote
Returns:
List[dict] 包含字段:
- symbol: 标的代码
- trade_date: 交易日期
- change_pct: 涨跌幅
- change_Nd_pct: N日涨跌幅
- ma_N: 均线
- macd_dif/dea/bar: MACD指标
- bias_N: 乖离率
- is_limit_up/down: 涨跌停状态
- is_st: 是否ST
"""
raise NotImplementedError("fetch_kline_quote not implemented")
async def fetch_kline_finance(
self,
symbol: str,
start: str,
end: str
) -> List[dict]:
"""获取日线财务数据
对应表: stock_klines_1d_finance
Returns:
List[dict] 包含字段:
- symbol: 标的代码
- trade_date: 交易日期
- total_market_cap: 总市值
- float_market_cap: 流通市值
- total_shares: 总股本
- float_shares: 流通股本
- inst_holding_shares: 机构持股数量
- inst_holding_ratio: 机构持仓占比
- net_profit: 净利润
- revenue: 营业总收入
- eps: 每股收益
- roe: 净资产收益率
"""
raise NotImplementedError("fetch_kline_finance not implemented")
async def fetch_stock_basic_info(
self,
codes: Optional[List[str]] = None
) -> List[dict]:
"""获取股票基础信息
对应表: stock_symbols
Returns:
List[dict] 包含字段:
- symbol_id: 标的代码
- name: 名称
- exchange: 交易所
- list_date: 上市日期
- list_board: 上市板块
- industry: 行业
- status: 状态
- is_delisted: 是否退市
- delist_date: 退市日期
"""
# 默认实现:使用 fetch_symbols 并转换格式
symbols = await self.fetch_symbols("stock")
return [
{
"symbol_id": s.symbol_id,
"name": s.name,
"exchange": s.exchange,
"list_date": s.list_date,
"delist_date": s.delist_date,
"status": "active" if not s.delist_date else "delisted",
}
for s in symbols
]