"""星耀数智(AmazingData)数据源适配器 基于银河证券星耀数智量化平台 SDK 的封装 提供统一、简洁的金融数据获取接口 """ import asyncio from datetime import datetime, date from typing import List, Optional, Dict, Any, Union from dataclasses import dataclass from enum import Enum from httpx import codes import pandas as pd from app.adapters.base import ( DataSourceAdapter, TickData, KLineData, SymbolInfo, TradeCalData, TickCallback ) from app.core.logger import info, error, warning from app.adapters.internal_data_service import InternalDataService class SecurityType(Enum): """证券类型枚举""" STOCK_A = "EXTRA_STOCK_A" # 沪深A股 STOCK_A_SH_SZ = "EXTRA_STOCK_A_SH_SZ" # 沪深A股(沪深) INDEX_A = "EXTRA_INDEX_A" # 沪深指数 ETF = "EXTRA_ETF" # ETF FUTURE = "EXTRA_FUTURE" # 期货 KZZ = "EXTRA_KZZ" # 可转债 GLRA = "EXTRA_GLRA" # 逆回购 HKT = "EXTRA_HKT" # 港股通 ETF_OP = "EXTRA_ETF_OP" # ETF期权 class Market(Enum): """市场枚举""" SH = "SH" # 上海 SZ = "SZ" # 深圳 BJ = "BJ" # 北京 @dataclass class AmazingDataConfig: """星耀数智数据源配置""" username: str password: str host: str port: int local_path: str = "./amazing_data_cache/" use_local_cache: bool = True class AmazingDataAdapter(DataSourceAdapter): """星耀数智(AmazingData)数据源适配器 封装银河证券星耀数智 SDK,提供统一的数据获取接口 """ def __init__(self): self.config: Optional[AmazingDataConfig] = None self._ad = None self._base_data = None self._market_data = None self._info_data = None self._calendar = None self._is_logged_in = False self._connected = False self._internal: Optional[InternalDataService] = None # 内部数据服务 def _check_login(self): """检查是否已登录""" if not self._is_logged_in: raise RuntimeError("未连接到数据源,请先调用 connect()") def _format_date(self, d: Union[str, int, date]) -> int: """统一日期格式为 YYYYMMDD""" if isinstance(d, int): return d elif isinstance(d, str): return int(d.replace("-", "").replace("/", "")) elif isinstance(d, date): return int(d.strftime("%Y%m%d")) else: raise ValueError(f"不支持的日期格式: {d}") def _get_list_date(self, symbol: str) -> Optional[int]: """获取股票上市日期 Returns: 上市日期 (YYYYMMDD格式),如果获取失败返回None """ try: # 方法1:尝试从代码信息中获取 code_info_df = self._internal.base.get_code_info(security_type=SecurityType.STOCK_A.value) if symbol in code_info_df.index: # 尝试不同的字段名 for field in ['list_date', 'LIST_DATE', 'listDate', 'founded_date']: if field in code_info_df.columns: list_date_val = code_info_df.loc[symbol, field] if pd.notna(list_date_val): # 处理不同格式的日期 if isinstance(list_date_val, str): return int(list_date_val.replace('-', '')) elif isinstance(list_date_val, (int, float)): return int(list_date_val) elif isinstance(list_date_val, pd.Timestamp): return int(list_date_val.strftime('%Y%m%d')) # 方法2:尝试从历史代码列表获取 try: hist_codes = self._internal.base.get_hist_code_list(security_type=SecurityType.STOCK_A.value) if symbol in hist_codes.index and 'list_date' in hist_codes.columns: list_date_val = hist_codes.loc[symbol, 'list_date'] if pd.notna(list_date_val): if isinstance(list_date_val, str): return int(list_date_val.replace('-', '')) elif isinstance(list_date_val, (int, float)): return int(list_date_val) except: pass return None except Exception as e: print(f"[amazingdata_adapter]获取上市日期失败: {e}") return None async def connect(self, config: dict) -> None: """建立连接""" try: import AmazingData as ad self._ad = ad # 解析配置 # 处理 port:支持字符串或整数,空字符串时使用默认值 port_val = config.get("port", 8080) if isinstance(port_val, str): port_val = int(port_val) if port_val.strip() else 8080 self.config = AmazingDataConfig( username=config.get("username", ""), password=config.get("password", ""), host=config.get("host", ""), port=port_val, local_path=config.get("local_path", "./amazing_data_cache/"), use_local_cache=config.get("use_local_cache", True) ) # 在executor中执行同步的登录操作 loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._do_login) self._connected = True info("成功连接到 AmazingData 星耀数智数据源") except ImportError: error("AmazingData SDK 未安装,请先安装 tgw 和 AmazingData 包") raise RuntimeError("AmazingData SDK not installed") except Exception as e: error(f"连接 AmazingData 失败: {e}") raise def _do_login(self): """执行登录(同步方法)""" # 检查配置是否完整 if not self.config.username or not self.config.password or not self.config.host: raise RuntimeError( f"AmazingData 配置不完整: username={self.config.username}, " f"host={self.config.host}, port={self.config.port}. " f"请在 config.json 中配置正确的账号信息" ) print("[amazingdata_adapter]正在登录 AmazingData...") print(f"[amazingdata_adapter]登录用户: {self.config.username}") print(f"[amazingdata_adapter]登录地址: {self.config.host}:{self.config.port}") # 登录 self._ad.login( username=self.config.username, password=self.config.password, host=self.config.host, port=self.config.port ) # 初始化数据类 self._base_data = self._ad.BaseData() self._info_data = self._ad.InfoData() self._calendar = self._base_data.get_calendar() self._market_data = self._ad.MarketData(self._calendar) # 初始化内部数据服务层 self._internal = InternalDataService(self) self._is_logged_in = True print("[amazingdata_adapter]登录成功") async def close(self) -> None: """关闭连接""" if self._is_logged_in and self._ad: try: loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: self._ad.logout(self.config.username) ) info("已断开与 AmazingData 的连接") except Exception as e: warning(f"断开连接时出错: {e}") self._is_logged_in = False self._connected = False async def subscribe_ticks(self, symbols: List[str], callback: TickCallback) -> None: """订阅实时Tick(AmazingData暂不支持实时推送模式)""" raise NotImplementedError("AmazingData does not support real-time tick subscription via callback") async def fetch_klines( self, symbol: str, start: str, end: str, freq: str ) -> List[KLineData]: print(f"[amazingdata_adapter fetch_klines]正在拉取 {symbol} 的 {freq} 周期数据...") """拉取历史K线""" self._check_login() # 转换周期格式为 AmazingData SDK 的周期值 period_map = { "1m": self._ad.constant.Period.min1, "5m": self._ad.constant.Period.min5, "15m": self._ad.constant.Period.min15, "30m": self._ad.constant.Period.min30, "60m": self._ad.constant.Period.min60, "1d": self._ad.constant.Period.day, "day": self._ad.constant.Period.day, "D": self._ad.constant.Period.day, "1w": self._ad.constant.Period.week, "week": self._ad.constant.Period.week, "W": self._ad.constant.Period.week, "1M": self._ad.constant.Period.month, "month": self._ad.constant.Period.month, "M": self._ad.constant.Period.month, } period_value = period_map.get(freq, self._ad.constant.Period.day).value # 在executor中执行同步查询 loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._fetch_klines_sync(symbol, start, end, period_value) ) def _fetch_klines_sync( self, symbol: str, start_date: str, end_date: str, period_value: str ) -> List[KLineData]: print(f"[amazingdata_adapter _fetch_klines_sync]正在同步拉取 {symbol} 的 {period_value} 周期数据...") """同步获取K线数据""" codes = [symbol] start_int = self._format_date(start_date) end_int = self._format_date(end_date) print(f"[amazingdata_adapter _fetch_klines_sync]正在获取K线数据: 代码={codes}, 日期范围={start_date}~{end_date}, 周期={period_value}") # 获取K线数据 - 使用内部接口 kline_dict = self._internal.market.query_kline( code_list=codes, begin_date=start_int, end_date=end_int, period=period_value ) if symbol not in kline_dict: info(f"No kline data found for {symbol}") return [] print(f"[amazingdata_adapter _fetch_klines_sync]获取到 {kline_dict} 的K线数据") # 打印原始数据结构 df = kline_dict[symbol] print(f"[amazingdata_adapter _fetch_klines_sync]获取到 {len(df)} 条K线数据") # ============================================ # 1. 获取证券基本信息(涨停价、跌停价) # ============================================ print(f"[amazingdata_adapter _fetch_klines_sync]正在获取证券基本信息...") try: code_info_df = self._internal.base.get_code_info(security_type=SecurityType.STOCK_A.value) # 提取当前股票的涨停价和跌停价 if symbol in code_info_df.index: high_limited = float(code_info_df.loc[symbol, 'high_limited']) if 'high_limited' in code_info_df.columns else None low_limited = float(code_info_df.loc[symbol, 'low_limited']) if 'low_limited' in code_info_df.columns else None print(f"[amazingdata_adapter _fetch_klines_sync]涨停价: {high_limited}, 跌停价: {low_limited}") else: high_limited = None low_limited = None print(f"[amazingdata_adapter _fetch_klines_sync]未找到 {symbol} 的涨跌停价格") except Exception as e: print(f"[amazingdata_adapter _fetch_klines_sync]获取证券信息失败: {e}") high_limited = None low_limited = None # ============================================ # 2. 获取股本结构(总股本、流通股) # ============================================ print(f"[amazingdata_adapter _fetch_klines_sync]正在获取股本结构...") try: equity_dict = self._internal.info.get_equity_structure( code_list=codes, local_path=self.config.local_path, is_local=self.config.use_local_cache ) print(f"[amazingdata_adapter _fetch_klines_sync]股本结构: {equity_dict}") if symbol in equity_dict: equity_df = equity_dict[symbol] print(f"[amazingdata_adapter _fetch_klines_sync]获取到 {len(equity_df)} 条股本结构数据") # 获取最近交易日的 TOT_A_SHARE 作为总股本 if not equity_df.empty: # 按日期排序,获取最新数据 if 'ANN_DATE' in equity_df.columns: equity_df = equity_df.sort_values('ANN_DATE') latest_equity = equity_df.iloc[-1] # 获取 TOT_A_SHARE(A股总股本) tot_a_share = float(latest_equity.get('TOT_A_SHARE', 0)) tot_share = tot_a_share * 10000 # 万股转股 # 流通股使用 FLOAT_A_SHARE(流通A股) float_a_share = float(latest_equity.get('FLOAT_A_SHARE', 0)) float_share = float_a_share * 10000 # 万股转股 print(f"[amazingdata_adapter _fetch_klines_sync]最近交易日 TOT_A_SHARE: {tot_a_share} 万股, FLOAT_A_SHARE: {float_a_share} 万股") print(f"[amazingdata_adapter _fetch_klines_sync]换算后总股本: {tot_share} 股, 流通股: {float_share} 股") else: tot_share = 0 float_share = 0 else: tot_share = 0 float_share = 0 except Exception as e: print(f"[amazingdata_adapter _fetch_klines_sync]获取股本结构失败: {e}") tot_share = 0 float_share = 0 # ============================================ # 3. 获取交易日历和上市日期 # ============================================ print(f"[amazingdata_adapter _fetch_klines_sync]正在获取交易日历...") try: # 获取交易日历 calendar = self._internal.base.get_calendar(market=Market.SH.value) # 获取股票上市日期 list_date = self._get_list_date(symbol) if list_date is None: list_date = min(calendar) if calendar else start_int print(f"[amazingdata_adapter _fetch_klines_sync]上市日期: {list_date}") except Exception as e: print(f"[amazingdata_adapter _fetch_klines_sync]获取交易日历失败: {e}") calendar = [] list_date = start_int # ============================================ # 4. 处理K线数据并补充字段 # ============================================ results = [] for _, row in df.iterrows(): # 从 kline_time 列获取日期 kline_time = row.get('kline_time') if pd.isna(kline_time) or kline_time is None: continue try: # 解析日期 if isinstance(kline_time, pd.Timestamp): ts = int(kline_time.timestamp()) trade_date = kline_time.strftime('%Y-%m-%d') trade_date_int = int(kline_time.strftime('%Y%m%d')) else: date_str = str(int(kline_time)) if len(date_str) != 8: continue dt = datetime.strptime(date_str, "%Y%m%d") ts = int(dt.timestamp()) trade_date = dt.strftime('%Y-%m-%d') trade_date_int = int(date_str) except (ValueError, TypeError) as e: continue # 获取收盘价 close = float(row.get('close', 0)) # ============================================ # 4.1 判断是否涨跌停 # ============================================ is_limit_up = False is_limit_down = False if high_limited and low_limited and close > 0: # 涨停:收盘价 >= 涨停价 * 0.995(允许0.5%误差) is_limit_up = close >= high_limited * 0.995 # 跌停:收盘价 <= 跌停价 * 1.005(允许0.5%误差) is_limit_down = close <= low_limited * 1.005 # ============================================ # 4.2 计算市值 # ============================================ print(f"[amazingdata_adapter _fetch_klines_sync]close: {close}, 总股本: {tot_share}, 流通股本: {float_share}") total_market_cap = close * tot_share if tot_share > 0 and close > 0 else None float_market_cap = close * float_share if float_share > 0 and close > 0 else None print(f"[amazingdata_adapter _fetch_klines_sync]总市值: {total_market_cap}, 流通市值: {float_market_cap}") # ============================================ # 4.3 计算可交易日数 # ============================================ trading_days = None if calendar and list_date: # 计算从上市日期到当前交易日的交易日数 trading_days = sum(1 for d in calendar if list_date <= d <= trade_date_int) # 机构持仓占比( AmazingData K线数据可能包含,如果没有则设为None ) inst_holding_ratio = None if 'inst_holding_ratio' in df.columns and pd.notna(row.get('inst_holding_ratio')): inst_holding_ratio = float(row.get('inst_holding_ratio')) results.append(KLineData( symbol=symbol, time=ts, open=float(row.get('open', 0)), high=float(row.get('high', 0)), low=float(row.get('low', 0)), close=close, volume=int(row.get('volume', 0)), amount=float(row.get('amount', 0)), trade_date=trade_date, is_limit_up=is_limit_up, is_limit_down=is_limit_down, total_market_cap=total_market_cap, float_market_cap=float_market_cap, inst_holding_ratio=inst_holding_ratio, trading_days=trading_days )) info(f"Fetched {len(results)} klines with extended fields from AmazingData for {symbol}") return results async def fetch_symbols(self, asset_type: str) -> List[SymbolInfo]: """获取标的列表""" self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._fetch_symbols_sync(asset_type) ) def _fetch_symbols_sync(self, asset_type: str) -> List[SymbolInfo]: """同步获取标的列表""" results = [] if asset_type == "stock": # 获取A股代码列表 codes = self._internal.base.get_code_list( security_type=SecurityType.STOCK_A.value ) # 获取代码信息 info_df = self._internal.base.get_code_info( security_type=SecurityType.STOCK_A.value ) # 构建代码到名称的映射 name_map = {} if 'symbol' in info_df.columns: for code in codes: if code in info_df.index: name_map[code] = info_df.loc[code, 'symbol'] for code in codes: # 解析交易所 if ".SH" in code: exchange = "SH" elif ".SZ" in code: exchange = "SZ" elif ".BJ" in code: exchange = "BJ" else: exchange = "" results.append(SymbolInfo( symbol_id=code, name=name_map.get(code, code), exchange=exchange )) elif asset_type == "futures": # 获取期货代码列表 codes = self._internal.base.get_future_code_list( security_type=SecurityType.FUTURE.value ) for code in codes: # 解析品种和合约月份 underlying = ''.join([c for c in code if c.isalpha()]).upper() contract_month = ''.join([c for c in code if c.isdigit()]) exchange = self._get_futures_exchange(underlying) ts_code = f"{code.upper()}.{exchange}" results.append(SymbolInfo( symbol_id=ts_code, name=code, exchange=exchange, underlying=underlying, contract_month=contract_month )) info(f"Fetched {len(results)} {asset_type} symbols from AmazingData") return results def _get_futures_exchange(self, underlying: str) -> str: """根据品种代码判断交易所""" # 上海期货交易所 if underlying in ['CU', 'AL', 'ZN', 'PB', 'NI', 'SN', 'AU', 'AG', 'RB', 'HC', 'BU', 'RU', 'FU', 'SP', 'WR', 'SS', 'LU', 'NR']: return 'SHFE' # 大连商品交易所 elif underlying in ['A', 'B', 'M', 'Y', 'P', 'C', 'CS', 'JD', 'LH', 'JM', 'J', 'I', 'FB', 'BB', 'RR', 'PG', 'EB', 'EG', 'V', 'PP', 'L']: return 'DCE' # 郑州商品交易所 elif underlying in ['WH', 'PM', 'CF', 'SR', 'TA', 'OI', 'RI', 'MA', 'FG', 'RS', 'RM', 'JR', 'LR', 'SM', 'SF', 'CY', 'AP', 'CJ', 'UR', 'SA', 'PF', 'PK']: return 'CZCE' # 中国金融期货交易所 elif underlying in ['IF', 'IC', 'IH', 'T', 'TF', 'TS', 'IM']: return 'CFFEX' # 上海国际能源交易中心 elif underlying in ['SC', 'BC', 'EC']: return 'INE' else: return 'SHFE' # 默认上海 async def fetch_trading_calendar( self, exchange: str, start: str, end: str ) -> List[TradeCalData]: """获取交易日历""" self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._fetch_calendar_sync(exchange, start, end) ) def _fetch_calendar_sync( self, exchange: str, start: str, end: str ) -> List[TradeCalData]: """同步获取交易日历""" # 获取交易日历 market = Market.SH if exchange in ["SH", "SSE"] else Market.SZ calendar = self._internal.base.get_calendar(market=market.value) start_int = self._format_date(start) end_int = self._format_date(end) results = [] for date_int in calendar: if start_int <= date_int <= end_int: dt = datetime.strptime(str(date_int), "%Y%m%d") results.append(TradeCalData( date=dt, is_trading_day=True )) return results async def health_check(self) -> bool: """健康检查""" if not self._connected or not self._is_logged_in: return False try: # 尝试获取代码列表作为健康检查 loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: self._internal.base.get_code_list( security_type=SecurityType.STOCK_A.value ) ) return True except Exception as e: error(f"AmazingData health check failed: {e}") return False # ==================== 星耀数智特有接口 ==================== async def get_adj_factor( self, codes: List[str], is_local: Optional[bool] = None ) -> pd.DataFrame: """获取复权因子(单次复权) Args: codes: 股票代码列表 is_local: 是否使用本地缓存 Returns: DataFrame (index: 日期, columns: 股票代码) """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.base.get_adj_factor( code_list=codes, local_path=self.config.local_path, is_local=is_local ) ) async def get_backward_factor( self, codes: List[str], is_local: Optional[bool] = None ) -> pd.DataFrame: """获取后复权因子""" self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.base.get_backward_factor( code_list=codes, local_path=self.config.local_path, is_local=is_local ) ) async def get_index_constituents( self, codes: List[str], is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取指数成分股 Args: codes: 指数代码列表,如 ['000300.SH', '000905.SH'] Returns: Dict[指数代码, DataFrame] """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_index_constituent( code_list=codes, local_path=self.config.local_path, is_local=is_local ) ) async def get_index_weights( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取指数成分股权重 支持指数: - 000016.SH: 上证50 - 000300.SH: 沪深300 - 000905.SH: 中证500 - 000906.SH: 中证800 - 000852.SH: 中证1000 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_index_weight( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) async def get_snapshot( self, codes: List[str], start_date: str, end_date: str ) -> Dict[str, pd.DataFrame]: """获取历史快照数据(tick级别)""" self._check_login() start_int = self._format_date(start_date) end_int = self._format_date(end_date) loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.market.query_snapshot( code_list=codes, begin_date=start_int, end_date=end_int ) ) async def get_balance_sheet( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取资产负债表""" return await self._get_financial_data( 'get_balance_sheet', codes, start_date, end_date, is_local ) async def get_cash_flow( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取现金流量表""" return await self._get_financial_data( 'get_cash_flow', codes, start_date, end_date, is_local ) async def get_income_statement( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取利润表""" return await self._get_financial_data( 'get_income', codes, start_date, end_date, is_local ) async def _get_financial_data( self, method: str, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """通用财务数据获取方法""" self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() def fetch(): fn = getattr(self._info_data, method) return fn( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) return await loop.run_in_executor(None, fetch) # ==================== 基础数据接口 ==================== async def get_code_info( self, security_type: SecurityType = SecurityType.STOCK_A ) -> pd.DataFrame: """获取证券基本信息 Args: security_type: 证券类型,默认 STOCK_A (沪深A股) Returns: DataFrame 包含字段: - symbol: 证券简称 - security_status: 产品状态标志 - pre_close: 昨收价 - high_limited: 涨停价 - low_limited: 跌停价 - price_tick: 最小价格变动单位 """ self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.base.get_code_info(security_type=security_type.value) ) async def get_trading_calendar( self, market: Market = Market.SH ) -> List[int]: """获取交易日历 Args: market: 市场,默认 SH (上海) Returns: 交易日列表,格式为 [20240102, 20240103, ...] """ self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.base.get_calendar(market=market.value) ) # ==================== 业绩数据接口 ==================== async def get_profit_express( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取业绩快报 Args: codes: 股票代码列表 start_date: 开始报告期 (如 20240930 表示2024年三季报) end_date: 结束报告期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - TOTAL_ASSETS: 总资产 - NET_PRO_EXCL_MIN_INT_INC: 净利润 - TOT_OPERA_REV: 营业总收入 - TOTAL_PROFIT: 利润总额 - OPERA_PROFIT: 营业利润 - EPS_BASIC: 基本每股收益 - ROE_WEIGHTED: 净资产收益率-加权 - YOY_GR_NET_PROFIT_PARENT: 同比增长率:归属母公司股东的净利润 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_profit_express( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) async def get_profit_notice( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取业绩预告 Args: codes: 股票代码列表 start_date: 开始报告期 end_date: 结束报告期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - P_TYPECODE: 业绩预告类型代码 - P_CHANGE_MAX: 预告净利润变动幅度上限 - P_CHANGE_MIN: 预告净利润变动幅度下限 - NET_PROFIT_MAX: 预告净利润上限(万元) - NET_PROFIT_MIN: 预告净利润下限(万元) - P_REASON: 业绩变动原因 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_profit_notice( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) # ==================== 股东股本数据接口 ==================== async def get_top10_shareholders( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取十大股东数据 Args: codes: 股票代码列表 start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - HOLDER_NAME: 股东名称 - HOLDER_QUANTITY: 持股数 - HOLDER_PCT: 持股比例(%) - HOLDER_HOLDER_CATEGORY: 股东性质(1:个人, 2:公司) - FLOAT_QTY: 流通股数量 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_share_holder( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) async def get_shareholder_count( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取股东户数数据 Args: codes: 股票代码列表 start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - HOLDER_TOTAL_NUM: A股、B股、H股、境外股的总户数 - HOLDER_NUM: A股股东户数 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_holder_num( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) async def get_equity_structure( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取股本结构数据 Args: codes: 股票代码列表 start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - TOT_SHARE: 总股本(万股) - FLOAT_SHARE: 流通股(万股) - FLOAT_A_SHARE: 流通A股(万股) - RESTRICTED_A_SHARE: 限售A股(万股) - TOT_RESTRICTED_SHARE: 限售股合计 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_equity_structure( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) # ==================== 融资融券数据接口 ==================== async def get_margin_summary( self, start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取融资融券成交汇总 Args: start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - TRADE_DATE: 交易日期 - SUM_BORROW_MONEY_BALANCE: 融资余额(元) - SUM_PURCH_WITH_BORROW_MONEY: 融资买入额(元) - SUM_REPAYMENT_OF_BORROW_MONEY: 融资偿还额(元) - SUM_SEC_LENDING_BALANCE: 融券余额(元) - SUM_SALES_OF_BORROWED_SEC: 融券卖出量 - SUM_MARGIN_TRADE_BALANCE: 融资融券余额(元) """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_margin_summary( local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) async def get_margin_detail( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取融资融券交易明细 Args: codes: 股票代码列表 start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: Dict[代码, DataFrame] 主要字段: - BORROW_MONEY_BALANCE: 融资余额 - PURCH_WITH_BORROW_MONEY: 融资买入额 - REPAYMENT_OF_BORROW_MONEY: 融资偿还额 - SEC_LENDING_BALANCE: 融券余额 - SALES_OF_BORROWED_SEC: 融券卖出量 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_margin_detail( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) # ==================== 交易异动数据接口 ==================== async def get_longhu_bang( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取龙虎榜数据 Args: codes: 股票代码列表 start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - TRADE_DATE: 交易日期 - REASON_TYPE_NAME: 上榜原因 - CHANGE_RANGE: 涨跌幅(%) - TRADER_NAME: 营业部名称 - BUY_AMOUNT: 买入金额(元) - SELL_AMOUNT: 卖出金额(元) - FLOW_MARK: 买卖表示(1买入, 2卖出) """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_long_hu_bang( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) async def get_block_trading( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> pd.DataFrame: """获取大宗交易数据 Args: codes: 股票代码列表 start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: DataFrame 主要字段: - TRADE_DATE: 交易日期 - B_SHARE_PRICE: 成交价(元) - B_SHARE_VOLUME: 成交量(万股) - B_SHARE_AMOUNT: 成交金额(万元) - B_BUYER_NAME: 买方营业部名称 - B_SELLER_NAME: 卖方营业部名称 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_block_trading( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) # ==================== ETF数据接口 ==================== async def get_etf_pcf( self, codes: List[str] ) -> tuple: """获取ETF申赎数据 Args: codes: ETF代码列表 Returns: (etf_info, etf_constituents) etf_info字段: - creation_redemption_unit: 每个篮子对应的ETF份数 - max_cash_ratio: 最大现金替代比例 - creation: 是否允许申购 - redemption: 是否允许赎回 etf_constituents字段: - underlying_symbol: 成份证券简称 - component_share: 成份证券数量 - substitute_flag: 现金替代标志 """ self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.base.get_etf_pcf(code_list=codes) ) async def get_fund_share( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取基金份额数据 Args: codes: 基金代码列表 start_date: 开始日期 end_date: 结束日期 is_local: 是否使用本地缓存 Returns: Dict[代码, DataFrame] 主要字段: - FUND_SHARE: 基金份额(万份) - TOTAL_SHARE: 基金总份额(万份) - FLOAT_SHARE: 流通份额(万份) - CHANGE_REASON: 份额变动原因 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_fund_share( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) # ==================== 可转债数据接口 ==================== async def get_kzz_issuance( self, codes: List[str], is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取可转债发行数据 Args: codes: 可转债代码列表 is_local: 是否使用本地缓存 Returns: Dict[代码, DataFrame] 主要字段: - STOCK_CODE: 正股代码 - LISTED_DATE: 上市日期 - PLAN_SCHEDULE: 方案进度 - CLAUSE_INI_CONV_PRICE: 初始转换价格 - LIST_ISSUE_SIZE: 发行规模(万元) - LIST_ISSUE_QUANTITY: 发行数量(万张) - TERM_YEAR: 借款期限(年) - COUPON_RATE: 利率(%) """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_kzz_issuance( code_list=codes, local_path=self.config.local_path, is_local=is_local ) ) async def get_history_stock_status( self, codes: List[str], start_date: Optional[str] = None, end_date: Optional[str] = None, is_local: Optional[bool] = None ) -> Dict[str, pd.DataFrame]: """获取历史股票状态数据 Args: codes: 股票代码列表 start_date: 开始日期 (YYYYMMDD) end_date: 结束日期 (YYYYMMDD) is_local: 是否使用本地缓存 Returns: Dict[代码, DataFrame] 主要字段: - TRADE_DATE: 交易日期 - UP_LIMIT: 涨停价 - DOWN_LIMIT: 跌停价 - MAX_UP_DOWN: 最大涨跌幅限制 - IS_ST: 是否ST - IS_SUSPEND: 是否停牌 """ self._check_login() is_local = is_local if is_local is not None else self.config.use_local_cache begin_date = self._format_date(start_date) if start_date else None end_date_int = self._format_date(end_date) if end_date else None loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._internal.info.get_history_stock_status( code_list=codes, local_path=self.config.local_path, is_local=is_local, begin_date=begin_date, end_date=end_date_int ) ) # ==================== New Split Table Data Fetch Methods ==================== async def fetch_kline_base( self, symbol: str, start: str, end: str, freq: str ) -> List[Dict[str, Any]]: """Fetch K-line base data (OHLCV) Corresponding tables: stock_klines_1d_base, stock_klines_1m_base, etc. Returns: List[Dict] containing fields: - symbol: Symbol code - ts: Timestamp - trade_date: Trade date - open/high/low/close: Price data - volume: Trading volume - amount: Trading amount - adj_factor: Adjustment factor """ print(f"[amazingdata_adapter fetch_kline_base]Fetching {symbol} {freq} base data...") self._check_login() period_map = { "1m": self._ad.constant.Period.min1, "5m": self._ad.constant.Period.min5, "15m": self._ad.constant.Period.min15, "30m": self._ad.constant.Period.min30, "60m": self._ad.constant.Period.min60, "1d": self._ad.constant.Period.day, "1w": self._ad.constant.Period.week, "1month": self._ad.constant.Period.month, } period_value = period_map.get(freq, self._ad.constant.Period.day).value loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._fetch_kline_base_sync(symbol, start, end, period_value, freq) ) def _fetch_kline_base_sync( self, symbol: str, start_date: str, end_date: str, period_value: int, freq: str ) -> List[Dict[str, Any]]: """Sync method to fetch K-line base data""" codes = [symbol] start_int = self._format_date(start_date) end_int = self._format_date(end_date) kline_dict = self._internal.market.query_kline( code_list=codes, begin_date=start_int, end_date=end_int, period=period_value ) if symbol not in kline_dict: info(f"No kline data found for {symbol}") return [] df = kline_dict[symbol] results = [] for _, row in df.iterrows(): kline_time = row.get('kline_time') if pd.isna(kline_time) or kline_time is None: continue if isinstance(kline_time, pd.Timestamp): ts = int(kline_time.timestamp()) trade_date = kline_time.strftime('%Y-%m-%d') else: date_str = str(int(kline_time)) if len(date_str) != 8: continue dt = datetime.strptime(date_str, "%Y%m%d") ts = int(dt.timestamp()) trade_date = dt.strftime('%Y-%m-%d') results.append({ "symbol": symbol, "ts": ts, "trade_date": trade_date, "open": float(row.get('open', 0)), "high": float(row.get('high', 0)), "low": float(row.get('low', 0)), "close": float(row.get('close', 0)), "volume": int(row.get('volume', 0)), "amount": float(row.get('amount', 0)), "adj_factor": float(row.get('adj_factor', 1.0)) if 'adj_factor' in df.columns else 1.0, }) info(f"Fetched {len(results)} base kline records for {symbol}") return results async def fetch_kline_quote( self, symbol: str, start: str, end: str ) -> List[Dict[str, Any]]: """Fetch daily quote indicator data (calculated) Corresponding table: stock_klines_1d_quote Returns: List[Dict] containing fields: - change_pct: Price change percentage - change_Nd_pct: N-day price change - ma_N: Moving averages - macd_dif/dea/bar: MACD indicators - bias_N: Bias ratios - is_limit_up/down: Limit up/down status - is_st: ST status """ print(f"[amazingdata_adapter fetch_kline_quote]Calculating {symbol} quote indicators...") self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._fetch_kline_quote_sync(symbol, start, end) ) def _fetch_kline_quote_sync( self, symbol: str, start_date: str, end_date: str ) -> List[Dict[str, Any]]: """Sync method to calculate quote indicators""" import numpy as np start_dt = datetime.strptime(start_date, "%Y%m%d") extended_start = datetime(start_dt.year - 1, start_dt.month, start_dt.day) extended_start_str = extended_start.strftime("%Y%m%d") codes = [symbol] start_int = self._format_date(extended_start_str) end_int = self._format_date(end_date) kline_dict = self._internal.market.query_kline( code_list=codes, begin_date=start_int, end_date=end_int, period=self._ad.constant.Period.day.value ) if symbol not in kline_dict: return [] df = kline_dict[symbol].copy() df = df.sort_values('kline_time') try: code_info_df = self._internal.base.get_code_info(security_type=SecurityType.STOCK_A.value) if symbol in code_info_df.index: high_limited = float(code_info_df.loc[symbol, 'high_limited']) if 'high_limited' in code_info_df.columns else None low_limited = float(code_info_df.loc[symbol, 'low_limited']) if 'low_limited' in code_info_df.columns else None else: high_limited = low_limited = None except: high_limited = low_limited = None results = [] closes = df['close'].values for i, (_, row) in enumerate(df.iterrows()): kline_time = row.get('kline_time') if pd.isna(kline_time): continue if isinstance(kline_time, pd.Timestamp): trade_date = kline_time.strftime('%Y-%m-%d') else: date_str = str(int(kline_time)) if len(date_str) != 8: continue dt = datetime.strptime(date_str, "%Y%m%d") trade_date = dt.strftime('%Y-%m-%d') close = float(row.get('close', 0)) change_pct = None if i > 0: prev_close = closes[i-1] if prev_close > 0: change_pct = round((close - prev_close) / prev_close * 100, 4) def calc_n_day_change(n): if i >= n and closes[i-n] > 0: return round((close - closes[i-n]) / closes[i-n] * 100, 4) return None def calc_ma(n): if i >= n - 1: return round(np.mean(closes[i-n+1:i+1]), 4) return None def calc_macd(): if i < 33: return None, None, None ema12 = pd.Series(closes[:i+1]).ewm(span=12).mean().iloc[-1] ema26 = pd.Series(closes[:i+1]).ewm(span=26).mean().iloc[-1] dif = ema12 - ema26 dea = pd.Series([ema12 - ema26 for _ in range(i+1)]).ewm(span=9).mean().iloc[-1] bar = (dif - dea) * 2 return round(dif, 6), round(dea, 6), round(bar, 6) def calc_bias(n): ma = calc_ma(n) if ma and ma > 0: return round((close - ma) / ma * 100, 4) return None is_limit_up = False is_limit_down = False if high_limited and low_limited and close > 0: is_limit_up = close >= high_limited * 0.995 is_limit_down = close <= low_limited * 1.005 macd_dif, macd_dea, macd_bar = calc_macd() if trade_date.replace('-', '') >= start_date: results.append({ "symbol": symbol, "trade_date": trade_date, "change_pct": change_pct, "change_5d_pct": calc_n_day_change(5), "change_10d_pct": calc_n_day_change(10), "change_20d_pct": calc_n_day_change(20), "change_30d_pct": calc_n_day_change(30), "change_60d_pct": calc_n_day_change(60), "macd_dif": macd_dif, "macd_dea": macd_dea, "macd_bar": macd_bar, "bias_5": calc_bias(5), "bias_10": calc_bias(10), "bias_20": calc_bias(20), "is_limit_up": is_limit_up, "is_limit_down": is_limit_down, "limit_up_price": round(high_limited, 4) if high_limited else None, "limit_down_price": round(low_limited, 4) if low_limited else None, "is_st": None, "ma_5": calc_ma(5), "ma_10": calc_ma(10), "ma_20": calc_ma(20), "ma_30": calc_ma(30), "ma_60": calc_ma(60), "ma_120": calc_ma(120), "ma_250": calc_ma(250), }) info(f"Calculated {len(results)} quote indicators for {symbol}") return results async def fetch_kline_finance( self, symbol: str, start: str, end: str ) -> List[Dict[str, Any]]: """Fetch daily finance data Corresponding table: stock_klines_1d_finance Data sources: get_equity_structure, get_share_holder, get_income Returns: List[Dict] containing fields: - total_market_cap: Total market cap - float_market_cap: Float market cap - total_shares: Total shares - float_shares: Float shares - inst_holding_shares: Institutional holding shares - inst_holding_ratio: Institutional holding ratio - net_profit: Net profit - revenue: Revenue - eps: EPS - roe: ROE """ print(f"[amazingdata_adapter fetch_kline_finance]Fetching {symbol} finance data...") self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._fetch_kline_finance_sync(symbol, start, end) ) def _fetch_kline_finance_sync( self, symbol: str, start_date: str, end_date: str ) -> List[Dict[str, Any]]: """Sync method to fetch finance data""" codes = [symbol] start_int = self._format_date(start_date) end_int = self._format_date(end_date) results = [] try: equity_dict = self._internal.info.get_equity_structure( code_list=codes, local_path=self.config.local_path, is_local=self.config.use_local_cache ) equity_data = {} if symbol in equity_dict: equity_df = equity_dict[symbol] for _, row in equity_df.iterrows(): ann_date = row.get('ANN_DATE') if pd.notna(ann_date): if isinstance(ann_date, (int, float)): date_key = str(int(ann_date)) else: date_key = str(ann_date).replace('-', '').replace('/', '') equity_data[date_key] = { 'total_shares': float(row.get('TOT_A_SHARE', 0)) * 10000 if pd.notna(row.get('TOT_A_SHARE')) else 0, 'float_shares': float(row.get('FLOAT_A_SHARE', 0)) * 10000 if pd.notna(row.get('FLOAT_A_SHARE')) else 0, } except Exception as e: print(f"[amazingdata_adapter]Failed to get equity structure: {e}") equity_data = {} kline_dict = self._internal.market.query_kline( code_list=codes, begin_date=start_int, end_date=end_int, period=self._ad.constant.Period.day.value ) if symbol not in kline_dict: return [] df = kline_dict[symbol] for _, row in df.iterrows(): kline_time = row.get('kline_time') if pd.isna(kline_time): continue if isinstance(kline_time, pd.Timestamp): trade_date = kline_time.strftime('%Y-%m-%d') trade_date_int = int(kline_time.strftime('%Y%m%d')) else: date_str = str(int(kline_time)) if len(date_str) != 8: continue dt = datetime.strptime(date_str, "%Y%m%d") trade_date = dt.strftime('%Y-%m-%d') trade_date_int = int(date_str) close = float(row.get('close', 0)) total_shares = 0 float_shares = 0 for date_key in sorted(equity_data.keys(), reverse=True): if int(date_key) <= trade_date_int: total_shares = equity_data[date_key]['total_shares'] float_shares = equity_data[date_key]['float_shares'] break total_market_cap = close * total_shares if total_shares > 0 and close > 0 else None float_market_cap = close * float_shares if float_shares > 0 and close > 0 else None results.append({ "symbol": symbol, "trade_date": trade_date, "total_market_cap": round(total_market_cap, 2) if total_market_cap else None, "float_market_cap": round(float_market_cap, 2) if float_market_cap else None, "total_shares": int(total_shares) if total_shares > 0 else None, "float_shares": int(float_shares) if float_shares > 0 else None, "inst_holding_shares": None, "inst_holding_ratio": None, "top10_holders_ratio": None, "net_profit": None, "revenue": None, "eps": None, "roe": None, "trading_days": None, }) info(f"Fetched {len(results)} finance records for {symbol}") return results async def fetch_stock_basic_info( self, codes: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """Fetch stock basic info Corresponding table: stock_symbols Data source: get_stock_basic Returns: List[Dict] containing fields: - symbol_id: Symbol code - name: Name - exchange: Exchange - list_date: List date - list_board: List board - industry: Industry - status: Status - is_delisted: Is delisted - delist_date: Delist date """ print(f"[amazingdata_adapter fetch_stock_basic_info]Fetching stock basic info...") self._check_login() loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: self._fetch_stock_basic_info_sync(codes) ) def _fetch_stock_basic_info_sync( self, codes: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """Sync method to fetch stock basic info""" try: all_codes = self._internal.base.get_code_list( security_type=SecurityType.STOCK_A.value ) if codes: all_codes = [c for c in all_codes if c in codes] info_df = self._internal.base.get_code_info( security_type=SecurityType.STOCK_A.value ) results = [] for code in all_codes: if ".SH" in code: exchange = "SH" elif ".SZ" in code: exchange = "SZ" elif ".BJ" in code: exchange = "BJ" else: exchange = "" name = code if code in info_df.index and 'symbol' in info_df.columns: name = info_df.loc[code, 'symbol'] list_date = None try: equity_dict = self._internal.info.get_equity_structure( code_list=[code], local_path=self.config.local_path, is_local=self.config.use_local_cache ) if code in equity_dict and not equity_dict[code].empty: first_record = equity_dict[code].iloc[0] ann_date = first_record.get('ANN_DATE') if pd.notna(ann_date): if isinstance(ann_date, (int, float)): list_date = datetime.strptime(str(int(ann_date)), "%Y%m%d") else: list_date = pd.to_datetime(ann_date) except: pass results.append({ "symbol_id": code, "name": name, "exchange": exchange, "list_date": list_date, "list_board": None, "industry": None, "status": "active", "is_delisted": False, "delist_date": None, "is_st": None, "total_shares": None, "float_shares": None, }) info(f"Fetched {len(results)} stock basic info records") return results except Exception as e: error(f"Failed to fetch stock basic info: {e}") return []