From 6da01034366cfa6a60bc1f6a41302ecd5ec0fa45 Mon Sep 17 00:00:00 2001 From: Lxy Date: Wed, 11 Mar 2026 11:55:36 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=8E=BB=E9=99=A4akshare?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/__pycache__/main.cpython-311.pyc | Bin 64369 -> 64360 bytes app/adapters/__init__.py | 2 - .../__pycache__/__init__.cpython-311.pyc | Bin 645 -> 570 bytes app/adapters/akshare_adapter.py | 637 ------------------ app/core/__pycache__/config.cpython-311.pyc | Bin 9051 -> 8896 bytes app/core/config.py | 8 +- app/main.py | 2 +- app/models/__pycache__/types.cpython-311.pyc | Bin 21953 -> 21957 bytes app/models/types.py | 4 +- .../adapter_service.cpython-311.pyc | Bin 14141 -> 13553 bytes .../futures_service.cpython-311.pyc | Bin 12463 -> 12450 bytes .../__pycache__/stock_service.cpython-311.pyc | Bin 12663 -> 12650 bytes .../__pycache__/test_service.cpython-311.pyc | Bin 19192 -> 19200 bytes app/services/adapter_service.py | 33 +- app/services/futures_service.py | 6 +- app/services/stock_service.py | 6 +- app/services/test_service.py | 8 +- config.json | 14 +- requirements.txt | 3 - scripts/sync_data.py | 14 +- scripts/test_extended_fields.py | 35 +- tests/test_xysz_integration.py | 3 +- 22 files changed, 39 insertions(+), 736 deletions(-) delete mode 100644 app/adapters/akshare_adapter.py diff --git a/app/__pycache__/main.cpython-311.pyc b/app/__pycache__/main.cpython-311.pyc index c04813d25fd15091598500a59e851f995ad76226..ca6054d5803371cd1c62cf877797f89fdb8b5da7 100644 GIT binary patch delta 100 zcmezPjrqklX5Qtzyj%=Gu;A&2%*h*hAFv8qx>&^|<`iTk#+T<8WfvDDZsupJVrJa8 zxrPQnVbKyRxvYf+g!;hv|J;*I3uwrRjsfzzr)(h^I>mtS|o)orV z22J*fS>^nEj^4o;iAAZ7DTxIosYMg-i8FFc)@GFBE#d^4R0J~5Pm_1D45OY7e`0nq zNNIc`L@ijwE#bu6#H!4^^pwPsM7Rvs diff --git a/app/adapters/akshare_adapter.py b/app/adapters/akshare_adapter.py deleted file mode 100644 index 7b6d025..0000000 --- a/app/adapters/akshare_adapter.py +++ /dev/null @@ -1,637 +0,0 @@ -"""AKShare数据源适配器 - 使用新浪接口""" -import asyncio -import time -from datetime import datetime -from typing import List, Optional, Dict, Any - -import akshare as ak -import pandas as pd - -from app.adapters.base import ( - DataSourceAdapter, TickData, KLineData, SymbolInfo, - TradeCalData, TickCallback -) -from app.core.logger import info, error, warning - - -# 缓存字典,用于存储股票基本信息和交易日历 -_stock_info_cache: Dict[str, Dict[str, Any]] = {} -_trade_calendar_cache: Optional[pd.DataFrame] = None -_inst_holding_cache: Dict[str, Dict[str, Any]] = {} - - -class AKShareAdapter(DataSourceAdapter): - """AKShare数据源适配器 - 使用新浪接口""" - - def __init__(self): - self.config = {} - self._connected = False - self._max_retries = 3 - self._retry_delay = 2 # 秒 - - # 实例缓存 - self._stock_info_cache: Dict[str, Dict[str, Any]] = {} - self._trade_calendar_cache: Optional[pd.DataFrame] = None - self._inst_holding_cache: Dict[str, Dict[str, Any]] = {} - - def _get_stock_code_from_symbol(self, symbol: str) -> str: - """从 symbol 中提取股票代码: 000001.SZ -> 000001""" - if "." in symbol: - return symbol.split(".")[0] - return symbol - - async def _get_trade_calendar(self) -> pd.DataFrame: - """获取交易日历(带缓存)""" - if self._trade_calendar_cache is None: - try: - df = await self._fetch_with_retry(ak.tool_trade_date_hist_sina) - if df is not None and not df.empty: - df['trade_date'] = pd.to_datetime(df['trade_date']) - self._trade_calendar_cache = df - info(f"Loaded trade calendar with {len(df)} trading days") - except Exception as e: - error(f"Failed to load trade calendar: {e}") - return pd.DataFrame() - return self._trade_calendar_cache if self._trade_calendar_cache is not None else pd.DataFrame() - - async def _get_stock_info(self, stock_code: str) -> Dict[str, Any]: - """获取股票基本信息(带缓存)""" - if stock_code not in self._stock_info_cache: - try: - # 使用东财接口获取个股信息 - info_df = await self._fetch_with_retry(ak.stock_individual_info_em, symbol=stock_code) - if info_df is not None and not info_df.empty: - info_dict = dict(zip(info_df['item'], info_df['value'])) - self._stock_info_cache[stock_code] = info_dict - info(f"Loaded stock info for {stock_code}") - except Exception as e: - error(f"Failed to get stock info for {stock_code}: {e}") - return {} - return self._stock_info_cache.get(stock_code, {}) - - async def _get_trading_days_count(self, stock_code: str, trade_date: datetime) -> int: - """获取可交易日数(从上市至今)""" - try: - stock_info = await self._get_stock_info(stock_code) - listing_date_str = str(stock_info.get('上市时间', '')) - - if not listing_date_str or listing_date_str == 'nan': - return 0 - - listing_date = datetime.strptime(listing_date_str, '%Y%m%d').date() - trade_calendar = await self._get_trade_calendar() - - if trade_calendar.empty: - return 0 - - # 计算从上市到指定日期的交易日数 - trade_calendar['trade_date'] = pd.to_datetime(trade_calendar['trade_date']).dt.date - trading_days = trade_calendar[ - (trade_calendar['trade_date'] >= listing_date) & - (trade_calendar['trade_date'] <= trade_date.date()) - ] - - return len(trading_days) - except Exception as e: - error(f"Failed to calculate trading days for {stock_code}: {e}") - return 0 - - async def _check_limit_up_down(self, stock_code: str, trade_date: str, close_price: float) -> tuple: - """检查是否涨停或跌停""" - try: - # 获取涨停/跌停股票池 - zt_df = await self._fetch_with_retry(ak.stock_zt_pool_em, date=trade_date) - dt_df = await self._fetch_with_retry(ak.stock_zt_pool_dtgc_em, date=trade_date) - - is_limit_up = False - is_limit_down = False - - if zt_df is not None and not zt_df.empty: - zt_list = zt_df['代码'].astype(str).tolist() - is_limit_up = stock_code in zt_list - - if dt_df is not None and not dt_df.empty: - dt_list = dt_df['代码'].astype(str).tolist() - is_limit_down = stock_code in dt_list - - return is_limit_up, is_limit_down - except Exception as e: - # 涨停跌停池接口可能不支持历史日期,失败时返回False - info(f"Could not get limit up/down info for {stock_code} on {trade_date}: {e}") - return False, False - - async def _get_market_cap(self, stock_code: str) -> tuple: - """获取总市值和流通市值""" - try: - stock_info = await self._get_stock_info(stock_code) - - total_cap = stock_info.get('总市值', 0) - float_cap = stock_info.get('流通市值', 0) - - # 转换为浮点数 - total_cap = float(total_cap) if total_cap and str(total_cap) != 'nan' else 0.0 - float_cap = float(float_cap) if float_cap and str(float_cap) != 'nan' else 0.0 - - return total_cap, float_cap - except Exception as e: - error(f"Failed to get market cap for {stock_code}: {e}") - return 0.0, 0.0 - - async def _get_inst_holding_ratio(self, stock_code: str) -> float: - """获取机构持仓占比""" - # 缓存键使用季度标识(简化处理,实际应按财报季度) - cache_key = f"{stock_code}_latest" - - if cache_key not in self._inst_holding_cache: - try: - # 获取基金持仓数据 - fund_holder_df = await self._fetch_with_retry(ak.stock_fund_stock_holder, symbol=stock_code) - - if fund_holder_df is not None and not fund_holder_df.empty: - # 获取最新季度的数据 - latest_quarter = fund_holder_df['季度'].iloc[0] - latest_df = fund_holder_df[fund_holder_df['季度'] == latest_quarter] - - # 计算机构持仓占比合计 - total_ratio = 0.0 - if '占总股本比例' in latest_df.columns: - ratios = pd.to_numeric(latest_df['占总股本比例'], errors='coerce').fillna(0) - total_ratio = ratios.sum() - - self._inst_holding_cache[cache_key] = { - 'quarter': latest_quarter, - 'ratio': total_ratio - } - info(f"Loaded inst holding for {stock_code}: {total_ratio:.4f}% in {latest_quarter}") - else: - self._inst_holding_cache[cache_key] = {'quarter': '', 'ratio': 0.0} - except Exception as e: - error(f"Failed to get inst holding for {stock_code}: {e}") - return 0.0 - - return self._inst_holding_cache.get(cache_key, {}).get('ratio', 0.0) - - async def connect(self, config: dict) -> None: - """建立连接(AKShare无需认证)""" - self.config = config - self._connected = True - info("AKShare adapter connected (Sina API)") - - async def subscribe_ticks(self, symbols: List[str], callback: TickCallback) -> None: - """订阅实时Tick(AKShare不支持实时推送)""" - raise NotImplementedError("AKShare does not support real-time tick subscription") - - async def fetch_klines( - self, - symbol: str, - start: str, - end: str, - freq: str - ) -> List[KLineData]: - info(f"Fetching KLines from Sina for {symbol} [{freq}] from {start} to {end}") - """拉取历史K线""" - # 判断是股票还是期货 - if ".SH" in symbol or ".SZ" in symbol or ".BJ" in symbol: - return await self._fetch_stock_klines(symbol, start, end, freq) - elif "." in symbol: # 期货格式: CU2504.SHFE - return await self._fetch_futures_klines(symbol, start, end, freq) - else: - raise ValueError(f"Unknown symbol format: {symbol}") - - async def _fetch_stock_klines( - self, - symbol: str, - start: str, - end: str, - freq: str - ) -> List[KLineData]: - """获取股票K线 - 使用新浪接口""" - # 转换symbol格式: 000001.SZ -> sz000001 - ts_code = self._normalize_stock_symbol(symbol) - - if freq in ["1d", "day", "D", ""]: - return await self._fetch_stock_daily_sina(ts_code, symbol, start, end) - elif freq in ["1m", "5m", "15m", "30m", "60m"]: - return await self._fetch_stock_minute_sina(ts_code, symbol, start, end, freq) - else: - raise ValueError(f"Unsupported frequency: {freq}") - - def _normalize_stock_symbol(self, symbol: str) -> str: - """转换股票代码格式: 000001.SZ -> sz000001""" - if "." in symbol: - code, exchange = symbol.split(".") - exchange_map = { - "SH": "sh", - "SZ": "sz", - "BJ": "bj" - } - return exchange_map.get(exchange, "sz") + code - return symbol - - def _denormalize_stock_symbol(self, symbol: str) -> str: - """还原股票代码格式: sz000001 -> 000001.SZ""" - if symbol.startswith("sh"): - return symbol[2:] + ".SH" - elif symbol.startswith("sz"): - return symbol[2:] + ".SZ" - elif symbol.startswith("bj"): - return symbol[2:] + ".BJ" - return symbol - - async def _fetch_with_retry(self, func, *args, **kwargs): - """带重试机制的调用""" - last_exception = None - - for attempt in range(self._max_retries): - try: - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) - except Exception as e: - last_exception = e - error_msg = str(e).lower() - - # 检查是否是可重试的错误 - if any(x in error_msg for x in ['connection', 'timeout', 'remote', 'reset', 'closed']): - if attempt < self._max_retries - 1: - warning(f"Sina API request failed (attempt {attempt + 1}/{self._max_retries}): {e}") - await asyncio.sleep(self._retry_delay * (attempt + 1)) # 指数退避 - continue - - # 不可重试的错误,直接抛出 - raise - - raise last_exception - - async def _fetch_stock_daily_sina( - self, - ts_code: str, - original_symbol: str, - start_date: str, - end_date: str - ) -> List[KLineData]: - """获取股票日线 - 使用新浪接口(包含扩展字段)""" - try: - # 新浪接口获取历史数据 - # 使用 stock_zh_a_daily 接口(新浪) - df = await self._fetch_with_retry( - ak.stock_zh_a_daily, - symbol=ts_code, - start_date=start_date, - end_date=end_date, - adjust="qfq" # 前复权 - ) - - if df is None or df.empty: - warning(f"No data returned from Sina for {original_symbol}") - return [] - - # 获取股票代码(不带交易所后缀) - stock_code = self._get_stock_code_from_symbol(original_symbol) - - # 预获取市值和机构持仓数据(这些不随日期变化) - total_cap, float_cap = await self._get_market_cap(stock_code) - inst_ratio = await self._get_inst_holding_ratio(stock_code) - - results = [] - for _, row in df.iterrows(): - trade_date = datetime.strptime(str(row['date']), "%Y-%m-%d") - trade_date_str = trade_date.strftime("%Y%m%d") - close_price = float(row['close']) - - # 获取可交易日数(只计算一次,以当前日期为准) - trading_days = await self._get_trading_days_count(stock_code, trade_date) - - # 检查涨停跌停(注意:历史数据可能无法准确判断) - is_limit_up, is_limit_down = await self._check_limit_up_down( - stock_code, trade_date_str, close_price - ) - - results.append(KLineData( - symbol=original_symbol, - time=int(trade_date.timestamp()), - open=float(row['open']), - high=float(row['high']), - low=float(row['low']), - close=close_price, - volume=int(row['volume']), - amount=float(row.get('amount', 0)), - trade_date=trade_date.strftime('%Y-%m-%d'), - is_limit_up=is_limit_up, - is_limit_down=is_limit_down, - total_market_cap=total_cap, - float_market_cap=float_cap, - inst_holding_ratio=inst_ratio, - trading_days=trading_days - )) - - info(f"Fetched {len(results)} daily klines with extended fields from Sina for {original_symbol}") - return results - - except Exception as e: - error(f"Failed to fetch stock daily from Sina for {original_symbol}: {e}") - # 新浪接口失败时返回空列表 - return [] - - async def _fetch_stock_minute_sina( - self, - ts_code: str, - original_symbol: str, - start_date: str, - end_date: str, - freq: str - ) -> List[KLineData]: - """获取股票分钟线 - 使用新浪接口""" - try: - # 新浪分钟线接口 - # 使用 stock_zh_a_minute 接口 - df = await self._fetch_with_retry( - ak.stock_zh_a_minute, - symbol=ts_code, - period=freq.replace("m", ""), # 1m -> 1 - adjust="qfq" - ) - - if df is None or df.empty: - return [] - - # 过滤日期范围 - df['date'] = pd.to_datetime(df['date']) - start_dt = datetime.strptime(start_date, "%Y%m%d") - end_dt = datetime.strptime(end_date, "%Y%m%d") - df = df[(df['date'] >= start_dt) & (df['date'] <= end_dt)] - - results = [] - for _, row in df.iterrows(): - trade_time = datetime.strptime(str(row['date']), "%Y-%m-%d %H:%M:%S") - results.append(KLineData( - symbol=original_symbol, - time=int(trade_time.timestamp()), - open=float(row['open']), - high=float(row['high']), - low=float(row['low']), - close=float(row['close']), - volume=int(row['volume']), - amount=float(row.get('amount', 0)) - )) - - return results - except Exception as e: - error(f"Failed to fetch stock minute from Sina for {original_symbol}: {e}") - return [] - - async def _fetch_futures_klines( - self, - symbol: str, - start: str, - end: str, - freq: str - ) -> List[KLineData]: - """获取期货K线 - 使用新浪接口""" - if freq in ["1d", "day", "D", ""]: - return await self._fetch_futures_daily_sina(symbol, start, end) - elif freq in ["1m", "5m", "15m", "30m", "60m"]: - return await self._fetch_futures_minute_sina(symbol, start, end, freq) - else: - raise ValueError(f"Unsupported frequency: {freq}") - - async def _fetch_futures_daily_sina( - self, - symbol: str, - start_date: str, - end_date: str - ) -> List[KLineData]: - """获取期货日线 - 使用新浪接口""" - try: - # 解析合约代码: CU2504.SHFE -> cu2504 - contract_code, exchange = symbol.split(".") - contract_code = contract_code.lower() - - # 新浪期货历史行情接口 - df = await self._fetch_with_retry( - ak.futures_zh_daily, - symbol=contract_code, - start_date=start_date, - end_date=end_date - ) - - if df is None or df.empty: - return [] - - results = [] - for _, row in df.iterrows(): - trade_date = datetime.strptime(str(row['date']), "%Y-%m-%d") - results.append(KLineData( - symbol=symbol, - time=int(trade_date.timestamp()), - open=float(row['open']), - high=float(row['high']), - low=float(row['low']), - close=float(row['close']), - volume=int(row['volume']), - amount=float(row.get('amount', 0)), - open_interest=int(row.get('hold', 0)) - )) - - return results - except Exception as e: - error(f"Failed to fetch futures daily from Sina for {symbol}: {e}") - return [] - - async def _fetch_futures_minute_sina( - self, - symbol: str, - start_date: str, - end_date: str, - freq: str - ) -> List[KLineData]: - """获取期货分钟线 - 使用新浪接口""" - try: - # 解析合约代码 - contract_code, exchange = symbol.split(".") - contract_code = contract_code.lower() - - # 新浪期货分钟线接口 - df = await self._fetch_with_retry( - ak.futures_zh_minute_sina, - symbol=contract_code, - period=freq.replace("m", "") - ) - - if df is None or df.empty: - return [] - - # 过滤日期范围 - df['datetime'] = pd.to_datetime(df['datetime']) - start_dt = datetime.strptime(start_date, "%Y%m%d") - end_dt = datetime.strptime(end_date, "%Y%m%d") - df = df[(df['datetime'] >= start_dt) & (df['datetime'] <= end_dt)] - - results = [] - for _, row in df.iterrows(): - trade_time = row['datetime'] - results.append(KLineData( - symbol=symbol, - time=int(trade_time.timestamp()), - open=float(row['open']), - high=float(row['high']), - low=float(row['low']), - close=float(row['close']), - volume=int(row['volume']), - amount=0, - open_interest=0 - )) - - return results - except Exception as e: - error(f"Failed to fetch futures minute from Sina for {symbol}: {e}") - return [] - - async def fetch_symbols(self, asset_type: str) -> List[SymbolInfo]: - """获取标的列表""" - if asset_type == "stock": - return await self._fetch_stock_symbols_sina() - elif asset_type == "futures": - return await self._fetch_futures_symbols_sina() - else: - raise ValueError(f"Unsupported asset type: {asset_type}") - - async def _fetch_stock_symbols_sina(self) -> List[SymbolInfo]: - """获取A股股票列表 - 使用新浪接口""" - try: - # 新浪A股列表接口 - df = await self._fetch_with_retry(ak.stock_zh_a_spot) - - if df is None or df.empty: - return [] - - results = [] - for _, row in df.iterrows(): - # 新浪接口的代码格式 - code = str(row['代码']) - if code.startswith('6') or code.startswith('5') or code.startswith('9'): - ts_code = f"{code}.SH" - exchange = "SH" - elif code.startswith('8') or code.startswith('4'): - ts_code = f"{code}.BJ" - exchange = "BJ" - else: - ts_code = f"{code}.SZ" - exchange = "SZ" - - results.append(SymbolInfo( - symbol_id=ts_code, - name=str(row['名称']), - exchange=exchange - )) - - info(f"Fetched {len(results)} stock symbols from Sina") - return results - except Exception as e: - error(f"Failed to fetch stock symbols from Sina: {e}") - return [] - - async def _fetch_futures_symbols_sina(self) -> List[SymbolInfo]: - """获取期货合约列表 - 使用新浪接口""" - try: - # 新浪期货列表接口 - df = await self._fetch_with_retry(ak.futures_zh_realtime, subscribe_list=["0", "1", "2", "3"]) - - if df is None or df.empty: - return [] - - results = [] - for _, row in df.iterrows(): - symbol = str(row['symbol']) - underlying = ''.join([c for c in symbol if c.isalpha()]).upper() - contract_month = ''.join([c for c in symbol if c.isdigit()]) - - exchange = self._get_futures_exchange(underlying) - ts_code = f"{symbol.upper()}.{exchange}" - - results.append(SymbolInfo( - symbol_id=ts_code, - name=str(row.get('name', symbol)), - exchange=exchange, - underlying=underlying, - contract_month=contract_month - )) - - info(f"Fetched {len(results)} futures symbols from Sina") - return results - except Exception as e: - error(f"Failed to fetch futures symbols from Sina: {e}") - return [] - - def _get_futures_exchange(self, underlying: str) -> str: - """根据品种代码判断交易所""" - # 上海期货交易所 - if underlying in ['CU', 'AL', 'ZN', 'PB', 'NI', 'SN', 'AU', 'AG', 'RB', 'HC', - 'BU', 'RU', 'FU', 'SP', 'WR', 'SS', 'LU', 'NR']: - return 'SHFE' - # 大连商品交易所 - elif underlying in ['A', 'B', 'M', 'Y', 'P', 'C', 'CS', 'JD', 'LH', 'JM', - 'J', 'I', 'FB', 'BB', 'RR', 'PG', 'EB', 'EG', 'V', 'PP', 'L']: - return 'DCE' - # 郑州商品交易所 - elif underlying in ['WH', 'PM', 'CF', 'SR', 'TA', 'OI', 'RI', 'MA', 'FG', 'RS', - 'RM', 'JR', 'LR', 'SM', 'SF', 'CY', 'AP', 'CJ', 'UR', 'SA', 'PF', 'PK']: - return 'CZCE' - # 中国金融期货交易所 - elif underlying in ['IF', 'IC', 'IH', 'T', 'TF', 'TS', 'IM']: - return 'CFFEX' - # 上海国际能源交易中心 - elif underlying in ['SC', 'BC', 'EC']: - return 'INE' - else: - return 'SHFE' # 默认上海 - - async def fetch_trading_calendar( - self, - exchange: str, - start: str, - end: str - ) -> List[TradeCalData]: - """获取交易日历 - 使用新浪接口""" - try: - # 新浪交易日历接口 - df = await self._fetch_with_retry(ak.tool_trade_date_hist_sina) - - if df is None or df.empty: - return [] - - # 过滤日期范围 - df['trade_date'] = pd.to_datetime(df['trade_date']) - start_dt = datetime.strptime(start, "%Y%m%d") - end_dt = datetime.strptime(end, "%Y%m%d") - df = df[(df['trade_date'] >= start_dt) & (df['trade_date'] <= end_dt)] - - results = [] - for _, row in df.iterrows(): - cal_date = row['trade_date'] - results.append(TradeCalData( - date=cal_date, - is_trading_day=True - )) - - return results - except Exception as e: - error(f"Failed to fetch trading calendar from Sina: {e}") - return [] - - async def health_check(self) -> bool: - """健康检查""" - try: - if not self._connected: - return False - - # 尝试获取股票列表作为健康检查 - df = await self._fetch_with_retry(ak.stock_zh_a_spot) - return df is not None and not df.empty - except Exception as e: - error(f"Health check failed: {e}") - return False - - async def close(self) -> None: - """关闭连接""" - self._connected = False - info("AKShare adapter closed (Sina API)") diff --git a/app/core/__pycache__/config.cpython-311.pyc b/app/core/__pycache__/config.cpython-311.pyc index 80d793b3b3ea87ea9d861097f874bc42f0ca597f..51ef055881cd83f24f93b75a58e6a1ec8ab924fa 100644 GIT binary patch delta 950 zcmb7@O=uHA6vua(Y_>@@+qF$%+Rg6P3Vvg$6~(spP)(|+sj;OJ{32#+h&Dy01)oK;K_OtCDe-uo?7U|o9}IdVyy><-CzFm=Dj!n*&(0f zuVczgAm9^ljC_5P`=LHncKBs~I-efS4P>qKP#O;SCls>~FYbHqHu0Hrn^5w93-(~i zU15k(>GH4`%qcayT$amnTcYK$qswykEamCUs|=1NtK>Jh5mO9&IXAgZ?a$xWUGVHui)>6(OR z(WFAG*oJXYa)5DEb%Ne76~jO?l{nC;M5hv6Ds&c4t}H2cuwSW@%$KBHdl9Fgr#cot z_RC3>)Otb_p_|Y_pu6Qx*@|!A#0b4GU;UOP?dcl3yulfaW+4;Wz>;u1)M2bvo}#{l zG<**|V=4PV_#MOid}I^Lz@5lOk>g^&{U(yA@SLOMEQGb4!Sg6TNmh;zr1O2epBD0P zTGKrlk{4lIGsIDpm+XhybLJnTD0SjF$m*szj`9jj>X+FVR2mK9Rh&;kqcIwAk|=#M z;~98i7~(aY+=7xZ!cuU;TqjQ8dc$Eh)&fK4n| zBu3+lQ3(f68Y2fx4;m9AUQF}^!GozWk%MxDgv5&nXSRp|y?FBN?zi*4`DSLn&1Pra zpIXe1MbXH?TI`xpujM{7?>A)%a)0EUO#SS*>4w=JZ%Mf3^maJs^>(&sS{l6s@_SsA zU|D)8B%1Ln<~T*CWGH%-O!JR)9QPEi^xB{?D1vrmDq9qzVwy5+Rx4Rkx@mD^hj#9X z*;_)JpdYOTNC{Cj=nu!DDP7IpN*%4kL_I4P`*Tr~GRZ-CSPk_la#UsyMDMmfVB_4Y ztdH5Yuh!Cs7iWfMx6jqK&RIS4R?i2k=d-<>eHW`OywPl2<`8)CpkajrSmi`kYp(jQ z?&(qPEI9O{O&dZXLJ`6VfMH8}I7WN@5mz`AQ2VMI2jpOnA~$&;7g@8u>tC!lp!EjL zx+{B6P8$w7gkV89$;PsqM#X)GWiAbN0pxZ8-jrR4yAzGT^MVGzLsZO3W%u1c)L@AQ z5|&6tW|-Rx*ZR^Y59eGU#ttB87eF_And>H}?&R5apGFP~XeWXu z+4Ryh4&e-Q=e;EztU9MKS)M<+*02XMDtlM312W46&5pmd@L_L+Ue>ntIq@YkE$<2F z7pxT|zW;gVM|30{wjn+Nu&J*F^wUM6X*bYl-YeiCP zhL1wEhtNI=98GZ^#m01ky>?2*TM**UBC?SD=IkO>5`mB6Pq=+-1=-c#sUsw*mUfb# DQ(OwH diff --git a/app/core/config.py b/app/core/config.py index 748ed0e..8a9f080 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -49,16 +49,15 @@ class SourceInfo(BaseModel): class SourceConfig(BaseModel): """源配置""" - active: str = "akshare" + active: str = "amazingdata" list: Dict[str, SourceInfo] = Field(default_factory=dict) class SourcesConfig(BaseModel): """数据源配置""" stock: SourceConfig = Field(default_factory=lambda: SourceConfig( - active="akshare", + active="amazingdata", list={ - "akshare": SourceInfo(type="http", config={"timeout": "30"}), "amazingdata": SourceInfo(type="sdk", config={ "username": "", "password": "", @@ -70,9 +69,8 @@ class SourcesConfig(BaseModel): } )) futures: SourceConfig = Field(default_factory=lambda: SourceConfig( - active="akshare", + active="amazingdata", list={ - "akshare": SourceInfo(type="http", config={"timeout": "30"}), "amazingdata": SourceInfo(type="sdk", config={ "username": "", "password": "", diff --git a/app/main.py b/app/main.py index 8b96297..42b0ac1 100644 --- a/app/main.py +++ b/app/main.py @@ -988,7 +988,7 @@ ADMIN_HTML = ''' // 切换数据源按钮 html += '
'; - html += ''; + html += ''; html += '
'; document.getElementById('source-status').innerHTML = html; diff --git a/app/models/__pycache__/types.cpython-311.pyc b/app/models/__pycache__/types.cpython-311.pyc index 3a22eac069272c8cf534b00cdbf0d42acc3f1c26..83e9ab06f44826366f3572785eef5f6e7397292e 100644 GIT binary patch delta 54 zcmX@On(^prM&9MTyj%=G(E4;k=K77i8pb@_iMfeYnR)3ci6x1XZH?m@l_$?NKE|lL JImIMa5difn5;gz; delta 50 zcmX@Qn(^RjM&9MTyj%=GaQEtl%;g(-HHTse2h_kbB0N* FA^@?#5YPYs diff --git a/app/models/types.py b/app/models/types.py index 8ec4183..ea18ce5 100644 --- a/app/models/types.py +++ b/app/models/types.py @@ -209,11 +209,11 @@ class DataSourceInfo(BaseModel): class DataSourceStatusData(BaseModel): """数据源状态响应""" stock: DataSourceInfo = Field(default_factory=lambda: DataSourceInfo( - active_source="akshare", + active_source="amazingdata", status=DataSourceStatus.HEALTHY )) futures: DataSourceInfo = Field(default_factory=lambda: DataSourceInfo( - active_source="akshare", + active_source="amazingdata", status=DataSourceStatus.HEALTHY )) diff --git a/app/services/__pycache__/adapter_service.cpython-311.pyc b/app/services/__pycache__/adapter_service.cpython-311.pyc index b4f3389dc2a8ec1c8788d5d52fc5ddf4940e9d03..1baacdd40fd971ca6105847eb0d716be09d80c8c 100644 GIT binary patch delta 2566 zcmZ`)eN0nV6u-BxuV1tjXn_JkA5hvpVEDn80VV@KKtP-kGf=12z9&{oi?HeIV+dA)MwFcZ=m(;8Z4TtE+-+_-~& zWU6BilmV^sY(B$p%9%fO!mJn1Mj@V4g!g=5o`1Mh>G%1HEJ( z!a}^sa!5B+4&n*`AAlb~1Skfm1Snx38lNUt4KXF4^u^}rVbv6n0=Q@L(gAxXfct85mWMLTW2>S|CmGY}0V7wB35@JiM9$Vx!vg4?=D4V%Xd z3nyz$7lYKG)*{>dws=0yjjvY7g}GmIb3kW&mHFbi!gS&al7A z_mWUVQH?Q3WI0BmC$aPtOs4qLoH37hv}@F;Nf&c)`192t4WEqDVrmK9ytc- z4v}Xbew$l^U#I`&9>FE?BY91_rI0f1B0u}l2~-5h1ZZF|Wn1MnONoSPWc;~Cv+^79 zDBYOwrf3)atf(O00!MC?YHiYh=>$5dJHv|XEEJkHSkJKzn06zwS}C07v2`ZNI%`+)&@J9v+Z@hm z2Wv0|X(U~N!kZefh<@eG;cXiAv)4`E@TTjeWa{*l04YdzDdo{d#xzq$bc1GzhaYYB z`{{_!ZcH1%Cy?Zz=X}d?GPCK@zlxaH>7P$KGSg|NzbMD4sidFeo;VLrkNXR82K~}s zk5lMFe-^gW)tT9Jftb^%#ZThbqAq3+VhMDD>c{l+gHW%^v0@`y9W6+Kg+}ISL=GxqQlhlAbWPMzG%RhN z>WwOviA<%EGGQrfsg*wTL#V8&k!alQ^jwK2uAzMS}S`lb|l`{tZmn=Z4=hE8+OOh!SC&!3A=}`dG5ze^;7E-s~O%yz`3s> zvp2(k7X9GCgJ+2*8{>z|e!@dX8?u`$+&3lJjRxH~j~P8~ur$u+$5YZ9i}~>)9yrB3 zaON+wv&kikzA4#wDHQ`|G#FPp(dFEv<_hkz-wJevzIn0n@?s9?S`2hu2BWVyEiHB2 zmArW^)!dbu>=uE)>a@00=W$mBeT&C@)x$BtRUZc0uhG?fOC8lOd|0jZ703zIx?K*0 zmF=5@+vVWSN4m_T9T`5bFLtVX_I1FCnimJ!pcl6q;8x7OkH^OP+jgAm7B(o=HQH!=;nx>3C1$-+zXb2yEB?0lvUY AzyJUM delta 3036 zcmZ`*eM}t36`#4?y$|l-4!93|eB8kS$6`3Xf)hKG1`>y08o9fGYur&4JE2u7tEsI>XjwrkYsrsNqO%Y5PFaO^)~D|^YdF}oLcx?SP2g$r$|KZlERaUOw@v| zzM=6jS@;R|ah3**(3>cY-l3mZob(_3{xVAxOL_B#G&;!N7bf8jM&_7&NU}<{*U@Hu z%rdK#FBM33nlH5PwZa0)Ar)%BBB@w1NzQrW{|VOlTti57Pr2BV)rAWy$4;&sd*iQ{ z-(Q=3ZS9RyE9Wk#rtn0HSuE3eVY}1K*hEb1>~#^5O7%au#Kc$ddnCbNLN(`Ppbj z>|)Q7x%;NMJ7ez7nvttw+38*OR4kYF|Iu4D-*7H8AIe&d)p=RO-~@j%e=fRY_Gg`_ zr0ma*EoWP^oWWC&6@&oeV8A$RbNiOezN}+iPdwAuo1wolP2$RhC(OqUGfiMsGe8}H z2v85O6#&kGG%yfNkD6DFDJ85-r8e+k)g0F2H`OoNi3bFLJ5(X6$nlis6_p#46=H=^ zy2F}>y>zEF_~aLW)xtnD5;#551prp5d|Xz-Di=90MxJ5)l#zj7Yb>PBTesn%1=V_l zZ_)Mv=JtyPx@v7xa1v_X-f(P6-bqN3s75_1h?l)l7RPu9m>}}H_CC7g2w)%m zx1$v&7wQV5{LG8Mqq$^*qTC)IC?fr!%{VnFg;^47++J8W2(TOA%K$?FdjK9%&*jv^ z)LsTDu#D}M?bP9Z1|M74?f#IT39!MV_Lox8#F#(l?;ZYr?I1w1`C3;vUUk6^zWG#Z9h3>lDV6w;ubtAtDb0Ax_ z>WfDb;kcZFdIYD#O#X4{4m?k9m!8JKg>+eu;aLc)eog zP=fb)SotTH1f*-A{t)o*lc8fEO-$s%vZJ{{eq~EV;Vp5iB zO4;r|t!WbO^R}0|H9pUGxp!HKdk@N-*4Q0`=%LyGE}{3B4+ot#db!qvt<0(tn^dG- zIl7R0wOgOioGGCpw2zH}V}aJGBlrk8BlezR!VUUq_CHkU8cUx+29fMpbCQDEUuq8+ynTWH|+k5_OOen$bGi_MqzEHu$C5Y{ryM}&I!`RF}#Cd zu7xi(`~lC&6LV0Ps@DPhNjwk%?=he4sM ztqnyit#nCZ37G|YH?3|v)Jq(U-f;cIl4D~sArSC-_GlJtw(39Tj(AdxJf!K>O-dAM2FADfv5%9Sn L;InzZ!Uui;$gw7k delta 124 zcmZ3KxIU40IWI340}!lhUz=IKkynjHDbB?zCapNWJijQrxF9h(H6}N)C_A-8Hzlzo zQMWj?s4R1{3yTpK2YX_6aYkZM>gFcC=}ZEW7d3*fXarvr2)QBfu7ItnP?!?^0 zs?5Cfl*E$6&G!6_OaihOHNvlGgkKbhxFQg7K_FuD1;KOyf!K>O-dAM2FADfv5%9Sn P;4}HIp#J7EB}RS#ZTcrA delta 124 zcmaEr^gW4pIWI340}#w=Uz=I9k(ZH0DaOSrCapNWJijQrxF9h(H6}N)C_A-8Hzlzo zQMWj?s4R1{JPSKF2YX_6aYkZM>SiPUMkWEtiyFaKG=eV*gj^8_xgZd-`LJNRfI#F$ c8TTtP?iU3-t_XNs5b&6MK~R5lrV=AR0CKo1H~;_u diff --git a/app/services/__pycache__/test_service.cpython-311.pyc b/app/services/__pycache__/test_service.cpython-311.pyc index ad5711c172841d2c181944a53cfa015d4a89dcd1..a6d4cc90a801c51a0b304eb5322f3c215e2d16c7 100644 GIT binary patch delta 110 zcmew{m9b$OBj0jfUM>b8xc+oQCR^}EzBK}jnv-`6$jj#@R%Pa;rzDmnzTeY%i#ahT z=N5M&LU8j(fn)6K{tvVbZm1b=-Y22J$Y?nEzN8Va;R6ND6(%2;nbhSzuue`86W?qq Hb=Uv^zHcZY delta 92 zcmZpe#`t3@Bj0jfUM>b8cv7`C^IOnHzBK}js*`sM$ctweXCxM-zTeY%i#ahT=N5Y+ tL}2qzfn)4k-Vd}5Zm8+s&^4OuEvY>Djl?fT-O0}-jTm(|OG!O5002sUBR&8C diff --git a/app/services/adapter_service.py b/app/services/adapter_service.py index 1f0444e..480cfef 100644 --- a/app/services/adapter_service.py +++ b/app/services/adapter_service.py @@ -8,7 +8,7 @@ from app.models import ( AdapterListData, AdapterInfo, AdapterStatus, AdapterToggleRequest, AdapterConfigUpdateRequest ) -from app.adapters import DataSourceAdapter, AKShareAdapter, AmazingDataAdapter +from app.adapters import DataSourceAdapter, AmazingDataAdapter from app.core.logger import info, error @@ -35,26 +35,6 @@ class AdapterService: def _register_builtin_adapters(self): """注册内置适配器""" - # 注册AKShare适配器 - self.register_adapter("akshare", lambda: AKShareAdapter()) - - # 设置AKShare元数据 - self.metadata["akshare"] = { - "name": "akshare", - "type": "http", - "version": "1.0.0", - "description": "AKShare 开源金融数据接口(无需Token)", - "updated_at": datetime.now() - } - - # AKShare默认配置(无需token) - self.configs["akshare"] = { - "enabled": True, - "config": { - "timeout": 30 - } - } - # 注册星耀数智(AmazingData)适配器 self.register_adapter("amazingdata", lambda: AmazingDataAdapter()) @@ -170,7 +150,7 @@ class AdapterService: elif asset_class == "futures": active_name = self.config.sources.futures.active else: - active_name = "akshare" # 默认 + active_name = "amazingdata" # 默认 print(f"Using adapter: {active_name}") # 返回已激活的适配器实例 @@ -200,7 +180,6 @@ class AdapterService: self.factories[name] = factory async def _connect_adapter(self, name: str): - name = "amazingdata" # 强制使用 amazingdata 适配器进行测试 """连接适配器""" from app.core.config import get_config @@ -223,12 +202,8 @@ class AdapterService: # 从 config.json 获取最新配置(与文件同步) file_config = get_config() - print(f"226 Using file config: {file_config}, adapter name: {name}") - if name == "akshare" and False: # 暂时不使用 akshare 适配器 - source_info = file_config.sources.stock.list.get("akshare") - adapter_config = dict(source_info.config) if source_info else {} - adapter_config["timeout"] = int(adapter_config.get("timeout", 30)) - elif name == "amazingdata": + print(f"Using file config: {file_config}, adapter name: {name}") + if name == "amazingdata": # 优先使用 stock 下的 amazingdata 配置 source_info = file_config.sources.stock.list["amazingdata"] adapter_config = dict(source_info.config) if source_info else {} diff --git a/app/services/futures_service.py b/app/services/futures_service.py index dd0b068..a49ac1a 100644 --- a/app/services/futures_service.py +++ b/app/services/futures_service.py @@ -61,10 +61,10 @@ class FuturesService: # 确保适配器已连接 adapter = adapter_service.get_active_adapter("futures") if not adapter: - # 尝试连接akshare + # 尝试连接 amazingdata loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - loop.run_until_complete(adapter_service._connect_adapter("akshare")) + loop.run_until_complete(adapter_service._connect_adapter("amazingdata")) loop.close() adapter = adapter_service.get_active_adapter("futures") @@ -162,7 +162,7 @@ class FuturesService: # 确保适配器已连接 adapter = adapter_service.get_active_adapter("futures") if not adapter: - asyncio.run(adapter_service._connect_adapter("akshare")) + asyncio.run(adapter_service._connect_adapter("amazingdata")) adapter = adapter_service.get_active_adapter("futures") if not adapter: diff --git a/app/services/stock_service.py b/app/services/stock_service.py index 2607e30..aa0b9f2 100644 --- a/app/services/stock_service.py +++ b/app/services/stock_service.py @@ -71,10 +71,10 @@ class StockService: # 确保适配器已连接 adapter = adapter_service.get_active_adapter("stock") if not adapter: - # 尝试连接akshare + # 尝试连接 amazingdata loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - loop.run_until_complete(adapter_service._connect_adapter("akshare")) + loop.run_until_complete(adapter_service._connect_adapter("amazingdata")) loop.close() adapter = adapter_service.get_active_adapter("stock") @@ -187,7 +187,7 @@ class StockService: # 确保适配器已连接 adapter = adapter_service.get_active_adapter("stock") if not adapter: - asyncio.run(adapter_service._connect_adapter("akshare")) + asyncio.run(adapter_service._connect_adapter("amazingdata")) adapter = adapter_service.get_active_adapter("stock") if not adapter: diff --git a/app/services/test_service.py b/app/services/test_service.py index 966e265..5a90531 100644 --- a/app/services/test_service.py +++ b/app/services/test_service.py @@ -166,10 +166,10 @@ class TestService: name="切换数据源", method="POST", path="/v1/admin/source/switch", - description="切换到指定数据源(akshare)", + description="切换到指定数据源(amazingdata)", body={ "asset_class": "all", - "source": "akshare", + "source": "amazingdata", "sync_backfill": False } ), @@ -229,7 +229,7 @@ class TestService: path="/v1/admin/adapters/toggle", description="启用或禁用适配器", body={ - "name": "akshare", + "name": "amazingdata", "enable": True } ), @@ -240,7 +240,7 @@ class TestService: path="/v1/admin/adapters/config", description="更新适配器配置参数", body={ - "name": "akshare", + "name": "amazingdata", "config": { "timeout": "60" } diff --git a/config.json b/config.json index 5b252ed..674c294 100644 --- a/config.json +++ b/config.json @@ -21,12 +21,6 @@ "stock": { "active": "amazingdata", "list": { - "akshare": { - "type": "http", - "config": { - "timeout": "30" - } - }, "amazingdata": { "type": "sdk", "config": { @@ -41,14 +35,8 @@ } }, "futures": { - "active": "akshare", + "active": "amazingdata", "list": { - "akshare": { - "type": "http", - "config": { - "timeout": "30" - } - }, "amazingdata": { "type": "sdk", "config": { diff --git a/requirements.txt b/requirements.txt index 3639265..0a604a2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,9 +15,6 @@ numpy==2.1.3 numba==0.61.0 scipy==1.15.0 -# Data Source -akshare>=1.12.0 - # Configuration pydantic==2.10.0 pydantic-settings==2.6.1 diff --git a/scripts/sync_data.py b/scripts/sync_data.py index 7b0fc06..19c0504 100644 --- a/scripts/sync_data.py +++ b/scripts/sync_data.py @@ -8,7 +8,7 @@ from argparse import ArgumentParser # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from app.adapters import AKShareAdapter +from app.adapters import AmazingDataAdapter from app.repositories import SessionLocal from app.repositories.stock_repository import StockRepository from app.repositories.futures_repository import FuturesRepository @@ -31,7 +31,7 @@ def is_stock(symbol: str) -> bool: return symbol.endswith(".SH") or symbol.endswith(".SZ") or symbol.endswith(".BJ") -async def sync_stocks(adapter: AKShareAdapter, db): +async def sync_stocks(adapter: AmazingDataAdapter, db): """同步股票基础信息""" info("Syncing stock basic info...") @@ -65,7 +65,7 @@ async def sync_stocks(adapter: AKShareAdapter, db): raise -async def sync_futures(adapter: AKShareAdapter, db): +async def sync_futures(adapter: AmazingDataAdapter, db): """同步期货基础信息""" info("Syncing futures basic info...") @@ -114,7 +114,7 @@ async def sync_futures(adapter: AKShareAdapter, db): raise -async def sync_calendar(adapter: AKShareAdapter, db, start: str, end: str): +async def sync_calendar(adapter: AmazingDataAdapter, db, start: str, end: str): """同步交易日历""" info(f"Syncing trading calendar from {start} to {end}...") @@ -139,7 +139,7 @@ async def sync_calendar(adapter: AKShareAdapter, db, start: str, end: str): raise -async def sync_klines(adapter: AKShareAdapter, db, symbol: str, start: str, end: str, freq: str): +async def sync_klines(adapter: AmazingDataAdapter, db, symbol: str, start: str, end: str, freq: str): """同步K线数据""" info(f"Syncing {freq} klines for {symbol} from {start} to {end}...") @@ -197,8 +197,8 @@ async def main(): args = parser.parse_args() - # 初始化适配器(AKShare 无需 token) - adapter = AKShareAdapter() + # 初始化适配器 + adapter = AmazingDataAdapter() await adapter.connect({"timeout": 30}) # 创建数据库会话 diff --git a/scripts/test_extended_fields.py b/scripts/test_extended_fields.py index 69b5b1e..e2b1899 100644 --- a/scripts/test_extended_fields.py +++ b/scripts/test_extended_fields.py @@ -15,14 +15,21 @@ import asyncio sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from datetime import datetime, timedelta -from app.adapters.akshare_adapter import AKShareAdapter +from app.adapters.amazingdata_adapter import AmazingDataAdapter from app.core.logger import info, error async def test_fetch_daily_klines(): """测试获取日K线数据(包含扩展字段)""" - adapter = AKShareAdapter() - await adapter.connect({}) + adapter = AmazingDataAdapter() + await adapter.connect({ + "username": os.getenv("AMAZINGDATA_USERNAME", ""), + "password": os.getenv("AMAZINGDATA_PASSWORD", ""), + "host": os.getenv("AMAZINGDATA_HOST", ""), + "port": int(os.getenv("AMAZINGDATA_PORT", "8600")), + "local_path": "./amazing_data_cache/", + "use_local_cache": True + }) # 测试股票: 平安银行 000001.SZ symbol = "000001.SZ" @@ -63,28 +70,6 @@ async def test_fetch_daily_klines(): info("") info("扩展字段:") info(f" 交易日: {first_kline.trade_date}") - info(f" 是否涨停: {first_kline.is_limit_up}") - info(f" 是否跌停: {first_kline.is_limit_down}") - info(f" 总市值: {first_kline.total_market_cap:,.0f} 元" if first_kline.total_market_cap else " 总市值: N/A") - info(f" 流通市值: {first_kline.float_market_cap:,.0f} 元" if first_kline.float_market_cap else " 流通市值: N/A") - info(f" 机构持仓占比: {first_kline.inst_holding_ratio:.4f}%" if first_kline.inst_holding_ratio else " 机构持仓占比: N/A") - info(f" 可交易日数: {first_kline.trading_days}") - - # 统计涨停跌停情况 - limit_up_count = sum(1 for k in klines if k.is_limit_up) - limit_down_count = sum(1 for k in klines if k.is_limit_down) - - info(f"\n统计信息:") - info(f" 涨停天数: {limit_up_count}") - info(f" 跌停天数: {limit_down_count}") - - # 验证数据完整性 - all_have_market_cap = all(k.total_market_cap > 0 for k in klines) - all_have_trading_days = all(k.trading_days > 0 for k in klines) - - info(f"\n数据完整性检查:") - info(f" 所有记录都有市值数据: {all_have_market_cap}") - info(f" 所有记录都有交易日数: {all_have_trading_days}") return True diff --git a/tests/test_xysz_integration.py b/tests/test_xysz_integration.py index 7359d97..0f5d94b 100644 --- a/tests/test_xysz_integration.py +++ b/tests/test_xysz_integration.py @@ -38,8 +38,7 @@ class TestAPIEndpoints(unittest.TestCase): 'message': 'success', 'data': { 'adapters': [ - {'name': 'amazingdata', 'status': 'active', 'type': 'stock'}, - {'name': 'akshare', 'status': 'active', 'type': 'stock'} + {'name': 'amazingdata', 'status': 'active', 'type': 'stock'} ] } }).encode()