""" AmazingData 数据服务平台 - 数据服务 封装现有的 data_service.py 功能 """ import sys import os import json import threading from datetime import datetime, date from typing import Dict, List, Optional, Any, Callable from pathlib import Path # 添加父目录到路径以导入现有模块 parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, parent_dir) from backend.config import settings class AmazingDataPlatformService: """AmazingData 数据平台服务""" def __init__(self): self.adapter = None self.connected = False self._lock = threading.Lock() self.data_save_path = settings.DATA_SAVE_PATH def connect(self) -> bool: """连接 AmazingData""" try: import AmazingData as ad ret = ad.login( username=settings.AMAZING_DATA_USERNAME, password=settings.AMAZING_DATA_PASSWORD, host=settings.AMAZING_DATA_HOST, port=settings.AMAZING_DATA_PORT ) if ret: self.connected = True self.adapter = ad return True return False except Exception as e: print(f"Connect error: {e}") return False def disconnect(self) -> bool: """断开连接""" try: if self.connected and self.adapter: import AmazingData as ad ad.logout(settings.AMAZING_DATA_USERNAME) self.connected = False return True except Exception as e: print(f"Disconnect error: {e}") return False def get_single_kline( self, code: str, trading_day: Optional[str] = None, period: str = "day", save_path: Optional[str] = None ) -> Dict[str, Any]: """获取单只股票/期货K线数据""" try: import AmazingData as ad if not self.connected: self.connect() # 获取历史数据 history = ad.HistoryData() # 转换周期 period_map = { "day": ad.constant.Period.day.value, "min1": ad.constant.Period.min1.value, "min5": ad.constant.Period.min5.value, "min15": ad.constant.Period.min15.value, "min30": ad.constant.Period.min30.value, "min60": ad.constant.Period.min60.value, } period_value = period_map.get(period, ad.constant.Period.day.value) # 如果没有指定交易日,使用最近交易日 if not trading_day: trading_day = self._get_latest_trading_day() # 获取数据 start_date = f"{trading_day[:4]}-{trading_day[4:6]}-{trading_day[6:8]}" end_date = start_date df = history.get_kline_data( code=code, period=period_value, start_date=start_date, end_date=end_date ) # 转换数据 data_list = [] if df is not None and len(df) > 0: data_list = df.to_dict('records') # 保存文件 if save_path is None: save_path = os.path.join(self.data_save_path, "single") os.makedirs(save_path, exist_ok=True) safe_code = code.replace('.', '_') filename = f"{safe_code}_{trading_day}_{period}.json" filepath = os.path.join(save_path, filename) result = { "code": code, "trading_day": trading_day, "period": period, "data": data_list, "count": len(data_list) } with open(filepath, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) result["file_path"] = filepath return result except Exception as e: return {"error": str(e), "code": code, "trading_day": trading_day or "", "data": [], "count": 0} def batch_get_stock_kline( self, codes: Optional[List[str]] = None, trading_days: Optional[List[str]] = None, save_path: Optional[str] = None, batch_size: int = 100 ) -> Dict[str, str]: """批量获取股票K线数据""" try: import AmazingData as ad if not self.connected: self.connect() if save_path is None: save_path = os.path.join(self.data_save_path, "stock") os.makedirs(save_path, exist_ok=True) # 获取股票代码 if codes is None: base = ad.BaseData() codes = base.get_code_list("EXTRA_STOCK_A") # 获取交易日 if trading_days is None: trading_days = [self._get_latest_trading_day()] result_files = {} history = ad.HistoryData() for trading_day in trading_days: start_date = f"{trading_day[:4]}-{trading_day[4:6]}-{trading_day[6:8]}" day_data = {} for i, code in enumerate(codes): try: period_value = ad.constant.Period.day.value df = history.get_kline_data( code=code, period=period_value, start_date=start_date, end_date=start_date ) if df is not None and len(df) > 0: day_data[code] = df.to_dict('records') except Exception: continue # 保存文件 filename = f"kline_{trading_day}.json" filepath = os.path.join(save_path, filename) with open(filepath, 'w', encoding='utf-8') as f: json.dump(day_data, f, ensure_ascii=False, indent=2) result_files[trading_day] = filepath return result_files except Exception as e: return {"error": str(e)} def batch_get_future_kline( self, underlying_codes: Optional[List[str]] = None, use_main_contract: bool = True, trading_days: Optional[List[str]] = None, save_path: Optional[str] = None ) -> Dict[str, str]: """批量获取期货K线数据""" try: import AmazingData as ad if not self.connected: self.connect() if save_path is None: save_path = os.path.join(self.data_save_path, "future") os.makedirs(save_path, exist_ok=True) # 获取交易日 if trading_days is None: trading_days = [self._get_latest_trading_day()] result_files = {} history = ad.HistoryData() for trading_day in trading_days: start_date = f"{trading_day[:4]}-{trading_day[4:6]}-{trading_day[6:8]}" all_data = [] # 获取期货代码 if underlying_codes: codes = [] for uc in underlying_codes: if '.' in uc: codes.append(uc) else: # 简单主力合约识别 codes.append(f"{uc}2605.SHF") else: base = ad.BaseData() codes = base.get_code_list("EXTRA_FUTURE")[:50] # 限制数量 for code in codes: try: period_value = ad.constant.Period.day.value df = history.get_kline_data( code=code, period=period_value, start_date=start_date, end_date=start_date ) if df is not None and len(df) > 0: all_data.extend(df.to_dict('records')) except Exception: continue # 保存文件 filename = f"futures_{trading_day}.json" filepath = os.path.join(save_path, filename) result = { "metadata": { "source": "AmazingData", "trading_day": trading_day, "fetch_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "total_codes": len(codes), "total_records": len(all_data) }, "data": all_data } with open(filepath, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) result_files[trading_day] = filepath return result_files except Exception as e: return {"error": str(e)} def get_stock_codes(self) -> List[str]: """获取股票代码列表""" try: import AmazingData as ad if not self.connected: self.connect() base = ad.BaseData() return base.get_code_list("EXTRA_STOCK_A") except Exception: return [] def get_future_codes(self) -> List[str]: """获取期货代码列表""" try: import AmazingData as ad if not self.connected: self.connect() base = ad.BaseData() return base.get_code_list("EXTRA_FUTURE") except Exception: return [] def _get_latest_trading_day(self) -> str: """获取最近交易日""" today = date.today() return today.strftime('%Y%m%d') def test_connection(self) -> Dict[str, Any]: """测试连接""" try: import AmazingData as ad ret = ad.login( username=settings.AMAZING_DATA_USERNAME, password=settings.AMAZING_DATA_PASSWORD, host=settings.AMAZING_DATA_HOST, port=settings.AMAZING_DATA_PORT ) if ret: ad.logout(settings.AMAZING_DATA_USERNAME) return {"success": True, "message": "Connection successful"} return {"success": False, "message": "Login failed"} except Exception as e: return {"success": False, "message": str(e)} # 全局服务实例 data_service = AmazingDataPlatformService()