""" 数据采集服务 - 包装原始采集脚本 """ import json import logging import sys import os from datetime import datetime from typing import Dict, List, Optional logger = logging.getLogger(__name__) # 获取原始采集脚本路径 (buffer_platform/app/services -> buffer_platform -> parent = market_data_colector_platform) SCRIPT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) if SCRIPT_DIR not in sys.path: sys.path.insert(0, SCRIPT_DIR) logger.info(f"已添加采集脚本路径到sys.path: {SCRIPT_DIR}") def fetch_symbol_data( symbol: str, data_type: str = "futures", periods: Optional[List[str]] = None, max_workers: int = 2, ) -> Dict: """ 获取单个品种的多周期数据。 返回格式: { "symbol": "SN2504", "type": "futures", "current_price": 12345.0, "timestamp": "2025-01-15T10:30:00+08:00", "timeframes": { "5min": [{"datetime": ..., "open": ..., ...}, ...], ... } } """ try: from futures_data_collector import collect_futures_data, collect_stock_data if data_type == "stock": result = collect_stock_data(symbol) else: result = collect_futures_data(symbol) # 如果指定了周期,只保留需要的 if periods: filtered = {} for p in periods: if p in result.get("timeframes", {}): filtered[p] = result["timeframes"][p] result["timeframes"] = filtered return result except Exception as e: logger.error(f"采集 {symbol} 数据失败: {e}") return { "symbol": symbol, "type": data_type, "current_price": None, "timestamp": datetime.now().isoformat(), "timeframes": {}, "error": str(e), } def fetch_batch( symbols: List[str], data_type: str = "futures", periods: Optional[List[str]] = None, max_workers: int = 2, ) -> Dict[str, Dict]: """批量获取多个品种数据(串行,避免过度并发)""" results = {} for sym in symbols: logger.info(f"开始采集 {sym} ...") results[sym] = fetch_symbol_data(sym, data_type, periods, max_workers) return results