You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
323 lines
11 KiB
323 lines
11 KiB
"""
|
|
AmazingData 数据服务平台 - 数据服务
|
|
封装现有的 data_service.py 功能
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import threading
|
|
from datetime import datetime, date
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
from pathlib import Path
|
|
|
|
# 添加父目录到路径以导入现有模块
|
|
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
sys.path.insert(0, parent_dir)
|
|
|
|
from backend.config import settings
|
|
|
|
|
|
class AmazingDataPlatformService:
|
|
"""AmazingData 数据平台服务"""
|
|
|
|
def __init__(self):
|
|
self.adapter = None
|
|
self.connected = False
|
|
self._lock = threading.Lock()
|
|
self.data_save_path = settings.DATA_SAVE_PATH
|
|
|
|
def connect(self) -> bool:
|
|
"""连接 AmazingData"""
|
|
try:
|
|
import AmazingData as ad
|
|
|
|
ret = ad.login(
|
|
username=settings.AMAZING_DATA_USERNAME,
|
|
password=settings.AMAZING_DATA_PASSWORD,
|
|
host=settings.AMAZING_DATA_HOST,
|
|
port=settings.AMAZING_DATA_PORT
|
|
)
|
|
|
|
if ret:
|
|
self.connected = True
|
|
self.adapter = ad
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
print(f"Connect error: {e}")
|
|
return False
|
|
|
|
def disconnect(self) -> bool:
|
|
"""断开连接"""
|
|
try:
|
|
if self.connected and self.adapter:
|
|
import AmazingData as ad
|
|
ad.logout(settings.AMAZING_DATA_USERNAME)
|
|
self.connected = False
|
|
return True
|
|
except Exception as e:
|
|
print(f"Disconnect error: {e}")
|
|
return False
|
|
|
|
def get_single_kline(
|
|
self,
|
|
code: str,
|
|
trading_day: Optional[str] = None,
|
|
period: str = "day",
|
|
save_path: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""获取单只股票/期货K线数据"""
|
|
try:
|
|
import AmazingData as ad
|
|
|
|
if not self.connected:
|
|
self.connect()
|
|
|
|
# 获取历史数据
|
|
history = ad.HistoryData()
|
|
|
|
# 转换周期
|
|
period_map = {
|
|
"day": ad.constant.Period.day.value,
|
|
"min1": ad.constant.Period.min1.value,
|
|
"min5": ad.constant.Period.min5.value,
|
|
"min15": ad.constant.Period.min15.value,
|
|
"min30": ad.constant.Period.min30.value,
|
|
"min60": ad.constant.Period.min60.value,
|
|
}
|
|
period_value = period_map.get(period, ad.constant.Period.day.value)
|
|
|
|
# 如果没有指定交易日,使用最近交易日
|
|
if not trading_day:
|
|
trading_day = self._get_latest_trading_day()
|
|
|
|
# 获取数据
|
|
start_date = f"{trading_day[:4]}-{trading_day[4:6]}-{trading_day[6:8]}"
|
|
end_date = start_date
|
|
|
|
df = history.get_kline_data(
|
|
code=code,
|
|
period=period_value,
|
|
start_date=start_date,
|
|
end_date=end_date
|
|
)
|
|
|
|
# 转换数据
|
|
data_list = []
|
|
if df is not None and len(df) > 0:
|
|
data_list = df.to_dict('records')
|
|
|
|
# 保存文件
|
|
if save_path is None:
|
|
save_path = os.path.join(self.data_save_path, "single")
|
|
|
|
os.makedirs(save_path, exist_ok=True)
|
|
safe_code = code.replace('.', '_')
|
|
filename = f"{safe_code}_{trading_day}_{period}.json"
|
|
filepath = os.path.join(save_path, filename)
|
|
|
|
result = {
|
|
"code": code,
|
|
"trading_day": trading_day,
|
|
"period": period,
|
|
"data": data_list,
|
|
"count": len(data_list)
|
|
}
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
|
result["file_path"] = filepath
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {"error": str(e), "code": code, "trading_day": trading_day or "", "data": [], "count": 0}
|
|
|
|
def batch_get_stock_kline(
|
|
self,
|
|
codes: Optional[List[str]] = None,
|
|
trading_days: Optional[List[str]] = None,
|
|
save_path: Optional[str] = None,
|
|
batch_size: int = 100
|
|
) -> Dict[str, str]:
|
|
"""批量获取股票K线数据"""
|
|
try:
|
|
import AmazingData as ad
|
|
|
|
if not self.connected:
|
|
self.connect()
|
|
|
|
if save_path is None:
|
|
save_path = os.path.join(self.data_save_path, "stock")
|
|
os.makedirs(save_path, exist_ok=True)
|
|
|
|
# 获取股票代码
|
|
if codes is None:
|
|
base = ad.BaseData()
|
|
codes = base.get_code_list("EXTRA_STOCK_A")
|
|
|
|
# 获取交易日
|
|
if trading_days is None:
|
|
trading_days = [self._get_latest_trading_day()]
|
|
|
|
result_files = {}
|
|
history = ad.HistoryData()
|
|
|
|
for trading_day in trading_days:
|
|
start_date = f"{trading_day[:4]}-{trading_day[4:6]}-{trading_day[6:8]}"
|
|
day_data = {}
|
|
|
|
for i, code in enumerate(codes):
|
|
try:
|
|
period_value = ad.constant.Period.day.value
|
|
df = history.get_kline_data(
|
|
code=code,
|
|
period=period_value,
|
|
start_date=start_date,
|
|
end_date=start_date
|
|
)
|
|
if df is not None and len(df) > 0:
|
|
day_data[code] = df.to_dict('records')
|
|
except Exception:
|
|
continue
|
|
|
|
# 保存文件
|
|
filename = f"kline_{trading_day}.json"
|
|
filepath = os.path.join(save_path, filename)
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(day_data, f, ensure_ascii=False, indent=2)
|
|
|
|
result_files[trading_day] = filepath
|
|
|
|
return result_files
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def batch_get_future_kline(
|
|
self,
|
|
underlying_codes: Optional[List[str]] = None,
|
|
use_main_contract: bool = True,
|
|
trading_days: Optional[List[str]] = None,
|
|
save_path: Optional[str] = None
|
|
) -> Dict[str, str]:
|
|
"""批量获取期货K线数据"""
|
|
try:
|
|
import AmazingData as ad
|
|
|
|
if not self.connected:
|
|
self.connect()
|
|
|
|
if save_path is None:
|
|
save_path = os.path.join(self.data_save_path, "future")
|
|
os.makedirs(save_path, exist_ok=True)
|
|
|
|
# 获取交易日
|
|
if trading_days is None:
|
|
trading_days = [self._get_latest_trading_day()]
|
|
|
|
result_files = {}
|
|
history = ad.HistoryData()
|
|
|
|
for trading_day in trading_days:
|
|
start_date = f"{trading_day[:4]}-{trading_day[4:6]}-{trading_day[6:8]}"
|
|
all_data = []
|
|
|
|
# 获取期货代码
|
|
if underlying_codes:
|
|
codes = []
|
|
for uc in underlying_codes:
|
|
if '.' in uc:
|
|
codes.append(uc)
|
|
else:
|
|
# 简单主力合约识别
|
|
codes.append(f"{uc}2605.SHF")
|
|
else:
|
|
base = ad.BaseData()
|
|
codes = base.get_code_list("EXTRA_FUTURE")[:50] # 限制数量
|
|
|
|
for code in codes:
|
|
try:
|
|
period_value = ad.constant.Period.day.value
|
|
df = history.get_kline_data(
|
|
code=code,
|
|
period=period_value,
|
|
start_date=start_date,
|
|
end_date=start_date
|
|
)
|
|
if df is not None and len(df) > 0:
|
|
all_data.extend(df.to_dict('records'))
|
|
except Exception:
|
|
continue
|
|
|
|
# 保存文件
|
|
filename = f"futures_{trading_day}.json"
|
|
filepath = os.path.join(save_path, filename)
|
|
result = {
|
|
"metadata": {
|
|
"source": "AmazingData",
|
|
"trading_day": trading_day,
|
|
"fetch_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
"total_codes": len(codes),
|
|
"total_records": len(all_data)
|
|
},
|
|
"data": all_data
|
|
}
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
|
result_files[trading_day] = filepath
|
|
|
|
return result_files
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def get_stock_codes(self) -> List[str]:
|
|
"""获取股票代码列表"""
|
|
try:
|
|
import AmazingData as ad
|
|
if not self.connected:
|
|
self.connect()
|
|
base = ad.BaseData()
|
|
return base.get_code_list("EXTRA_STOCK_A")
|
|
except Exception:
|
|
return []
|
|
|
|
def get_future_codes(self) -> List[str]:
|
|
"""获取期货代码列表"""
|
|
try:
|
|
import AmazingData as ad
|
|
if not self.connected:
|
|
self.connect()
|
|
base = ad.BaseData()
|
|
return base.get_code_list("EXTRA_FUTURE")
|
|
except Exception:
|
|
return []
|
|
|
|
def _get_latest_trading_day(self) -> str:
|
|
"""获取最近交易日"""
|
|
today = date.today()
|
|
return today.strftime('%Y%m%d')
|
|
|
|
def test_connection(self) -> Dict[str, Any]:
|
|
"""测试连接"""
|
|
try:
|
|
import AmazingData as ad
|
|
ret = ad.login(
|
|
username=settings.AMAZING_DATA_USERNAME,
|
|
password=settings.AMAZING_DATA_PASSWORD,
|
|
host=settings.AMAZING_DATA_HOST,
|
|
port=settings.AMAZING_DATA_PORT
|
|
)
|
|
if ret:
|
|
ad.logout(settings.AMAZING_DATA_USERNAME)
|
|
return {"success": True, "message": "Connection successful"}
|
|
return {"success": False, "message": "Login failed"}
|
|
except Exception as e:
|
|
return {"success": False, "message": str(e)}
|
|
|
|
|
|
# 全局服务实例
|
|
data_service = AmazingDataPlatformService() |