You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
523 lines
19 KiB
523 lines
19 KiB
#!/usr/bin/env python3
|
|
"""Add new data fetch methods to amazingdata_adapter.py using English comments"""
|
|
|
|
new_methods = '''
|
|
|
|
# ==================== New Split Table Data Fetch Methods ====================
|
|
|
|
async def fetch_kline_base(
|
|
self,
|
|
symbol: str,
|
|
start: str,
|
|
end: str,
|
|
freq: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Fetch K-line base data (OHLCV)
|
|
|
|
Corresponding tables: stock_klines_1d_base, stock_klines_1m_base, etc.
|
|
|
|
Returns:
|
|
List[Dict] containing fields:
|
|
- symbol: Symbol code
|
|
- ts: Timestamp
|
|
- trade_date: Trade date
|
|
- open/high/low/close: Price data
|
|
- volume: Trading volume
|
|
- amount: Trading amount
|
|
- adj_factor: Adjustment factor
|
|
"""
|
|
print(f"[amazingdata_adapter fetch_kline_base]Fetching {symbol} {freq} base data...")
|
|
self._check_login()
|
|
|
|
period_map = {
|
|
"1m": self._ad.constant.Period.min1,
|
|
"5m": self._ad.constant.Period.min5,
|
|
"15m": self._ad.constant.Period.min15,
|
|
"30m": self._ad.constant.Period.min30,
|
|
"60m": self._ad.constant.Period.min60,
|
|
"1d": self._ad.constant.Period.day,
|
|
"1w": self._ad.constant.Period.week,
|
|
"1month": self._ad.constant.Period.month,
|
|
}
|
|
period_value = period_map.get(freq, self._ad.constant.Period.day).value
|
|
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(
|
|
None,
|
|
lambda: self._fetch_kline_base_sync(symbol, start, end, period_value, freq)
|
|
)
|
|
|
|
def _fetch_kline_base_sync(
|
|
self,
|
|
symbol: str,
|
|
start_date: str,
|
|
end_date: str,
|
|
period_value: int,
|
|
freq: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Sync method to fetch K-line base data"""
|
|
codes = [symbol]
|
|
start_int = self._format_date(start_date)
|
|
end_int = self._format_date(end_date)
|
|
|
|
kline_dict = self._market_data.query_kline(
|
|
code_list=codes,
|
|
begin_date=start_int,
|
|
end_date=end_int,
|
|
period=period_value
|
|
)
|
|
|
|
if symbol not in kline_dict:
|
|
info(f"No kline data found for {symbol}")
|
|
return []
|
|
|
|
df = kline_dict[symbol]
|
|
results = []
|
|
|
|
for _, row in df.iterrows():
|
|
kline_time = row.get('kline_time')
|
|
if pd.isna(kline_time) or kline_time is None:
|
|
continue
|
|
|
|
if isinstance(kline_time, pd.Timestamp):
|
|
ts = int(kline_time.timestamp())
|
|
trade_date = kline_time.strftime('%Y-%m-%d')
|
|
else:
|
|
date_str = str(int(kline_time))
|
|
if len(date_str) != 8:
|
|
continue
|
|
dt = datetime.strptime(date_str, "%Y%m%d")
|
|
ts = int(dt.timestamp())
|
|
trade_date = dt.strftime('%Y-%m-%d')
|
|
|
|
results.append({
|
|
"symbol": symbol,
|
|
"ts": ts,
|
|
"trade_date": trade_date,
|
|
"open": float(row.get('open', 0)),
|
|
"high": float(row.get('high', 0)),
|
|
"low": float(row.get('low', 0)),
|
|
"close": float(row.get('close', 0)),
|
|
"volume": int(row.get('volume', 0)),
|
|
"amount": float(row.get('amount', 0)),
|
|
"adj_factor": float(row.get('adj_factor', 1.0)) if 'adj_factor' in df.columns else 1.0,
|
|
})
|
|
|
|
info(f"Fetched {len(results)} base kline records for {symbol}")
|
|
return results
|
|
|
|
async def fetch_kline_quote(
|
|
self,
|
|
symbol: str,
|
|
start: str,
|
|
end: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Fetch daily quote indicator data (calculated)
|
|
|
|
Corresponding table: stock_klines_1d_quote
|
|
|
|
Returns:
|
|
List[Dict] containing fields:
|
|
- change_pct: Price change percentage
|
|
- change_Nd_pct: N-day price change
|
|
- ma_N: Moving averages
|
|
- macd_dif/dea/bar: MACD indicators
|
|
- bias_N: Bias ratios
|
|
- is_limit_up/down: Limit up/down status
|
|
- is_st: ST status
|
|
"""
|
|
print(f"[amazingdata_adapter fetch_kline_quote]Calculating {symbol} quote indicators...")
|
|
self._check_login()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(
|
|
None,
|
|
lambda: self._fetch_kline_quote_sync(symbol, start, end)
|
|
)
|
|
|
|
def _fetch_kline_quote_sync(
|
|
self,
|
|
symbol: str,
|
|
start_date: str,
|
|
end_date: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Sync method to calculate quote indicators"""
|
|
import numpy as np
|
|
|
|
start_dt = datetime.strptime(start_date, "%Y%m%d")
|
|
extended_start = datetime(start_dt.year - 1, start_dt.month, start_dt.day)
|
|
extended_start_str = extended_start.strftime("%Y%m%d")
|
|
|
|
codes = [symbol]
|
|
start_int = self._format_date(extended_start_str)
|
|
end_int = self._format_date(end_date)
|
|
|
|
kline_dict = self._market_data.query_kline(
|
|
code_list=codes,
|
|
begin_date=start_int,
|
|
end_date=end_int,
|
|
period=self._ad.constant.Period.day.value
|
|
)
|
|
|
|
if symbol not in kline_dict:
|
|
return []
|
|
|
|
df = kline_dict[symbol].copy()
|
|
df = df.sort_values('kline_time')
|
|
|
|
try:
|
|
code_info_df = self._base_data.get_code_info(security_type=SecurityType.STOCK_A.value)
|
|
if symbol in code_info_df.index:
|
|
high_limited = float(code_info_df.loc[symbol, 'high_limited']) if 'high_limited' in code_info_df.columns else None
|
|
low_limited = float(code_info_df.loc[symbol, 'low_limited']) if 'low_limited' in code_info_df.columns else None
|
|
else:
|
|
high_limited = low_limited = None
|
|
except:
|
|
high_limited = low_limited = None
|
|
|
|
results = []
|
|
closes = df['close'].values
|
|
|
|
for i, (_, row) in enumerate(df.iterrows()):
|
|
kline_time = row.get('kline_time')
|
|
if pd.isna(kline_time):
|
|
continue
|
|
|
|
if isinstance(kline_time, pd.Timestamp):
|
|
trade_date = kline_time.strftime('%Y-%m-%d')
|
|
else:
|
|
date_str = str(int(kline_time))
|
|
if len(date_str) != 8:
|
|
continue
|
|
dt = datetime.strptime(date_str, "%Y%m%d")
|
|
trade_date = dt.strftime('%Y-%m-%d')
|
|
|
|
close = float(row.get('close', 0))
|
|
|
|
change_pct = None
|
|
if i > 0:
|
|
prev_close = closes[i-1]
|
|
if prev_close > 0:
|
|
change_pct = round((close - prev_close) / prev_close * 100, 4)
|
|
|
|
def calc_n_day_change(n):
|
|
if i >= n and closes[i-n] > 0:
|
|
return round((close - closes[i-n]) / closes[i-n] * 100, 4)
|
|
return None
|
|
|
|
def calc_ma(n):
|
|
if i >= n - 1:
|
|
return round(np.mean(closes[i-n+1:i+1]), 4)
|
|
return None
|
|
|
|
def calc_macd():
|
|
if i < 33:
|
|
return None, None, None
|
|
ema12 = pd.Series(closes[:i+1]).ewm(span=12).mean().iloc[-1]
|
|
ema26 = pd.Series(closes[:i+1]).ewm(span=26).mean().iloc[-1]
|
|
dif = ema12 - ema26
|
|
dea = pd.Series([ema12 - ema26 for _ in range(i+1)]).ewm(span=9).mean().iloc[-1]
|
|
bar = (dif - dea) * 2
|
|
return round(dif, 6), round(dea, 6), round(bar, 6)
|
|
|
|
def calc_bias(n):
|
|
ma = calc_ma(n)
|
|
if ma and ma > 0:
|
|
return round((close - ma) / ma * 100, 4)
|
|
return None
|
|
|
|
is_limit_up = False
|
|
is_limit_down = False
|
|
if high_limited and low_limited and close > 0:
|
|
is_limit_up = close >= high_limited * 0.995
|
|
is_limit_down = close <= low_limited * 1.005
|
|
|
|
macd_dif, macd_dea, macd_bar = calc_macd()
|
|
|
|
if trade_date.replace('-', '') >= start_date:
|
|
results.append({
|
|
"symbol": symbol,
|
|
"trade_date": trade_date,
|
|
"change_pct": change_pct,
|
|
"change_5d_pct": calc_n_day_change(5),
|
|
"change_10d_pct": calc_n_day_change(10),
|
|
"change_20d_pct": calc_n_day_change(20),
|
|
"change_30d_pct": calc_n_day_change(30),
|
|
"change_60d_pct": calc_n_day_change(60),
|
|
"macd_dif": macd_dif,
|
|
"macd_dea": macd_dea,
|
|
"macd_bar": macd_bar,
|
|
"bias_5": calc_bias(5),
|
|
"bias_10": calc_bias(10),
|
|
"bias_20": calc_bias(20),
|
|
"is_limit_up": is_limit_up,
|
|
"is_limit_down": is_limit_down,
|
|
"limit_up_price": round(high_limited, 4) if high_limited else None,
|
|
"limit_down_price": round(low_limited, 4) if low_limited else None,
|
|
"is_st": None,
|
|
"ma_5": calc_ma(5),
|
|
"ma_10": calc_ma(10),
|
|
"ma_20": calc_ma(20),
|
|
"ma_30": calc_ma(30),
|
|
"ma_60": calc_ma(60),
|
|
"ma_120": calc_ma(120),
|
|
"ma_250": calc_ma(250),
|
|
})
|
|
|
|
info(f"Calculated {len(results)} quote indicators for {symbol}")
|
|
return results
|
|
|
|
async def fetch_kline_finance(
|
|
self,
|
|
symbol: str,
|
|
start: str,
|
|
end: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Fetch daily finance data
|
|
|
|
Corresponding table: stock_klines_1d_finance
|
|
Data sources: get_equity_structure, get_share_holder, get_income
|
|
|
|
Returns:
|
|
List[Dict] containing fields:
|
|
- total_market_cap: Total market cap
|
|
- float_market_cap: Float market cap
|
|
- total_shares: Total shares
|
|
- float_shares: Float shares
|
|
- inst_holding_shares: Institutional holding shares
|
|
- inst_holding_ratio: Institutional holding ratio
|
|
- net_profit: Net profit
|
|
- revenue: Revenue
|
|
- eps: EPS
|
|
- roe: ROE
|
|
"""
|
|
print(f"[amazingdata_adapter fetch_kline_finance]Fetching {symbol} finance data...")
|
|
self._check_login()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(
|
|
None,
|
|
lambda: self._fetch_kline_finance_sync(symbol, start, end)
|
|
)
|
|
|
|
def _fetch_kline_finance_sync(
|
|
self,
|
|
symbol: str,
|
|
start_date: str,
|
|
end_date: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Sync method to fetch finance data"""
|
|
codes = [symbol]
|
|
start_int = self._format_date(start_date)
|
|
end_int = self._format_date(end_date)
|
|
|
|
results = []
|
|
|
|
try:
|
|
equity_dict = self._info_data.get_equity_structure(
|
|
code_list=codes,
|
|
local_path=self.config.local_path,
|
|
is_local=self.config.use_local_cache
|
|
)
|
|
|
|
equity_data = {}
|
|
if symbol in equity_dict:
|
|
equity_df = equity_dict[symbol]
|
|
for _, row in equity_df.iterrows():
|
|
ann_date = row.get('ANN_DATE')
|
|
if pd.notna(ann_date):
|
|
if isinstance(ann_date, (int, float)):
|
|
date_key = str(int(ann_date))
|
|
else:
|
|
date_key = str(ann_date).replace('-', '').replace('/', '')
|
|
equity_data[date_key] = {
|
|
'total_shares': float(row.get('TOT_A_SHARE', 0)) * 10000 if pd.notna(row.get('TOT_A_SHARE')) else 0,
|
|
'float_shares': float(row.get('FLOAT_A_SHARE', 0)) * 10000 if pd.notna(row.get('FLOAT_A_SHARE')) else 0,
|
|
}
|
|
except Exception as e:
|
|
print(f"[amazingdata_adapter]Failed to get equity structure: {e}")
|
|
equity_data = {}
|
|
|
|
kline_dict = self._market_data.query_kline(
|
|
code_list=codes,
|
|
begin_date=start_int,
|
|
end_date=end_int,
|
|
period=self._ad.constant.Period.day.value
|
|
)
|
|
|
|
if symbol not in kline_dict:
|
|
return []
|
|
|
|
df = kline_dict[symbol]
|
|
|
|
for _, row in df.iterrows():
|
|
kline_time = row.get('kline_time')
|
|
if pd.isna(kline_time):
|
|
continue
|
|
|
|
if isinstance(kline_time, pd.Timestamp):
|
|
trade_date = kline_time.strftime('%Y-%m-%d')
|
|
trade_date_int = int(kline_time.strftime('%Y%m%d'))
|
|
else:
|
|
date_str = str(int(kline_time))
|
|
if len(date_str) != 8:
|
|
continue
|
|
dt = datetime.strptime(date_str, "%Y%m%d")
|
|
trade_date = dt.strftime('%Y-%m-%d')
|
|
trade_date_int = int(date_str)
|
|
|
|
close = float(row.get('close', 0))
|
|
|
|
total_shares = 0
|
|
float_shares = 0
|
|
for date_key in sorted(equity_data.keys(), reverse=True):
|
|
if int(date_key) <= trade_date_int:
|
|
total_shares = equity_data[date_key]['total_shares']
|
|
float_shares = equity_data[date_key]['float_shares']
|
|
break
|
|
|
|
total_market_cap = close * total_shares if total_shares > 0 and close > 0 else None
|
|
float_market_cap = close * float_shares if float_shares > 0 and close > 0 else None
|
|
|
|
results.append({
|
|
"symbol": symbol,
|
|
"trade_date": trade_date,
|
|
"total_market_cap": round(total_market_cap, 2) if total_market_cap else None,
|
|
"float_market_cap": round(float_market_cap, 2) if float_market_cap else None,
|
|
"total_shares": int(total_shares) if total_shares > 0 else None,
|
|
"float_shares": int(float_shares) if float_shares > 0 else None,
|
|
"inst_holding_shares": None,
|
|
"inst_holding_ratio": None,
|
|
"top10_holders_ratio": None,
|
|
"net_profit": None,
|
|
"revenue": None,
|
|
"eps": None,
|
|
"roe": None,
|
|
"trading_days": None,
|
|
})
|
|
|
|
info(f"Fetched {len(results)} finance records for {symbol}")
|
|
return results
|
|
|
|
async def fetch_stock_basic_info(
|
|
self,
|
|
codes: Optional[List[str]] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Fetch stock basic info
|
|
|
|
Corresponding table: stock_symbols
|
|
Data source: get_stock_basic
|
|
|
|
Returns:
|
|
List[Dict] containing fields:
|
|
- symbol_id: Symbol code
|
|
- name: Name
|
|
- exchange: Exchange
|
|
- list_date: List date
|
|
- list_board: List board
|
|
- industry: Industry
|
|
- status: Status
|
|
- is_delisted: Is delisted
|
|
- delist_date: Delist date
|
|
"""
|
|
print(f"[amazingdata_adapter fetch_stock_basic_info]Fetching stock basic info...")
|
|
self._check_login()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(
|
|
None,
|
|
lambda: self._fetch_stock_basic_info_sync(codes)
|
|
)
|
|
|
|
def _fetch_stock_basic_info_sync(
|
|
self,
|
|
codes: Optional[List[str]] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Sync method to fetch stock basic info"""
|
|
try:
|
|
all_codes = self._base_data.get_code_list(
|
|
security_type=SecurityType.STOCK_A.value
|
|
)
|
|
|
|
if codes:
|
|
all_codes = [c for c in all_codes if c in codes]
|
|
|
|
info_df = self._base_data.get_code_info(
|
|
security_type=SecurityType.STOCK_A.value
|
|
)
|
|
|
|
results = []
|
|
for code in all_codes:
|
|
if ".SH" in code:
|
|
exchange = "SH"
|
|
elif ".SZ" in code:
|
|
exchange = "SZ"
|
|
elif ".BJ" in code:
|
|
exchange = "BJ"
|
|
else:
|
|
exchange = ""
|
|
|
|
name = code
|
|
if code in info_df.index and 'symbol' in info_df.columns:
|
|
name = info_df.loc[code, 'symbol']
|
|
|
|
list_date = None
|
|
try:
|
|
equity_dict = self._info_data.get_equity_structure(
|
|
code_list=[code],
|
|
local_path=self.config.local_path,
|
|
is_local=self.config.use_local_cache
|
|
)
|
|
if code in equity_dict and not equity_dict[code].empty:
|
|
first_record = equity_dict[code].iloc[0]
|
|
ann_date = first_record.get('ANN_DATE')
|
|
if pd.notna(ann_date):
|
|
if isinstance(ann_date, (int, float)):
|
|
list_date = datetime.strptime(str(int(ann_date)), "%Y%m%d")
|
|
else:
|
|
list_date = pd.to_datetime(ann_date)
|
|
except:
|
|
pass
|
|
|
|
results.append({
|
|
"symbol_id": code,
|
|
"name": name,
|
|
"exchange": exchange,
|
|
"list_date": list_date,
|
|
"list_board": None,
|
|
"industry": None,
|
|
"status": "active",
|
|
"is_delisted": False,
|
|
"delist_date": None,
|
|
"is_st": None,
|
|
"total_shares": None,
|
|
"float_shares": None,
|
|
})
|
|
|
|
info(f"Fetched {len(results)} stock basic info records")
|
|
return results
|
|
|
|
except Exception as e:
|
|
error(f"Failed to fetch stock basic info: {e}")
|
|
return []
|
|
'''
|
|
|
|
def main():
|
|
# Read original file
|
|
with open('app/adapters/amazingdata_adapter.py', 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Check if already added
|
|
if 'fetch_kline_base' not in content:
|
|
# Append new methods before the last line
|
|
content = content.rstrip() + new_methods
|
|
|
|
with open('app/adapters/amazingdata_adapter.py', 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
print('New methods added successfully!')
|
|
else:
|
|
print('Methods already exist.')
|
|
|
|
if __name__ == "__main__":
|
|
main()
|