""" 数据采集服务 - 包装原始采集脚本 """ import json import logging import sys import os from datetime import datetime from typing import Dict, List, Optional logger = logging.getLogger(__name__) # 获取项目根目录 (buffer_platform/app/services -> buffer_platform) PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) logger.info(f"已添加项目根目录到sys.path: {PROJECT_ROOT}") def fetch_symbol_data( symbol: str, data_type: str = "futures", periods: Optional[List[str]] = None, max_workers: int = 2, ) -> Dict: """ 获取单个品种的多周期数据。 """ try: from futures_data_collector import collect_futures_data, collect_stock_data if data_type == "stock": result = collect_stock_data(symbol) else: result = collect_futures_data(symbol) # 如果指定了周期,只保留需要的 if periods: filtered = {} for p in periods: if p in result.get("timeframes", {}): filtered[p] = result["timeframes"][p] result["timeframes"] = filtered return result except Exception as e: logger.error(f"采集 {symbol} 数据失败: {e}") return { "symbol": symbol, "type": data_type, "current_price": None, "timestamp": datetime.now().isoformat(), "timeframes": {}, "error": str(e), } def fetch_batch( symbols: List[str], data_type: str = "futures", periods: Optional[List[str]] = None, max_workers: int = 2, ) -> Dict[str, Dict]: """批量获取多个品种数据(串行,避免过度并发)""" results = {} for sym in symbols: logger.info(f"开始采集 {sym} ...") results[sym] = fetch_symbol_data(sym, data_type, periods, max_workers) return results