You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
391 lines
12 KiB
391 lines
12 KiB
"""
|
|
期货 K 线服务 v2.2
|
|
支持多周期、持仓量、结算价等期货特有数据
|
|
"""
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional, Dict, Any
|
|
from enum import Enum
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.kline import (
|
|
Frequency, FuturesKLineItem, FuturesKLineData,
|
|
FuturesSymbolInfo, FuturesContractInfo, FuturesKLineQuery
|
|
)
|
|
from app.repositories.kline.futures_repository import FuturesKLineRepository
|
|
from app.services.cache_service import cache_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# 支持的期货 K 线周期
|
|
FUTURES_FREQUENCIES = [
|
|
Frequency.FREQ_1M,
|
|
Frequency.FREQ_5M,
|
|
Frequency.FREQ_15M,
|
|
Frequency.FREQ_30M,
|
|
Frequency.FREQ_1H,
|
|
Frequency.FREQ_1D,
|
|
Frequency.FREQ_1W,
|
|
Frequency.FREQ_1MONTH,
|
|
]
|
|
|
|
|
|
class FuturesKLineService:
|
|
"""期货 K 线服务"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.repository = FuturesKLineRepository(db)
|
|
self.db = db
|
|
|
|
async def query_klines(
|
|
self,
|
|
symbol: str,
|
|
freq: Frequency,
|
|
start: datetime,
|
|
end: datetime,
|
|
use_cache: bool = True
|
|
) -> FuturesKLineData:
|
|
"""
|
|
查询期货 K 线数据
|
|
|
|
Args:
|
|
symbol: 合约代码 (如 IF2406, AG2605.SHF)
|
|
freq: K 线周期
|
|
start: 开始时间
|
|
end: 结束时间
|
|
use_cache: 是否使用缓存
|
|
|
|
Returns:
|
|
FuturesKLineData: K 线数据响应
|
|
|
|
Raises:
|
|
ValueError: 参数验证失败
|
|
"""
|
|
# 参数验证
|
|
self._validate_params(symbol, freq, start, end)
|
|
|
|
# 验证周期是否支持
|
|
if freq not in FUTURES_FREQUENCIES:
|
|
raise ValueError(f"不支持的期货 K 线周期: {freq}")
|
|
|
|
# 尝试从缓存获取
|
|
cache_key = None
|
|
if use_cache:
|
|
cache_key = f"futures_kline:{symbol}:{freq.value}:{start.strftime('%Y%m%d')}:{end.strftime('%Y%m%d')}"
|
|
cached = await cache_service.get(cache_key)
|
|
if cached:
|
|
logger.info(f"缓存命中: {cache_key}")
|
|
return FuturesKLineData(**cached)
|
|
|
|
# 查询数据库
|
|
items = self.repository.get_klines(symbol, freq, start, end)
|
|
|
|
# 如果没有数据,尝试从适配器获取
|
|
if not items:
|
|
logger.info(f"数据库无 {symbol} 数据,尝试从数据源获取")
|
|
items = await self._fetch_from_adapter(symbol, freq, start, end)
|
|
|
|
# 保存到数据库
|
|
if items:
|
|
self._save_klines_to_db(symbol, freq, items)
|
|
|
|
# 获取品种信息
|
|
symbol_info = self.repository.get_symbol_info(symbol)
|
|
|
|
result = FuturesKLineData(
|
|
symbol=symbol,
|
|
name=symbol_info.name if symbol_info else "",
|
|
freq=freq,
|
|
count=len(items),
|
|
items=items
|
|
)
|
|
|
|
# 写入缓存
|
|
if cache_key and items:
|
|
await cache_service.set(cache_key, result.model_dump(), expire=300) # 5分钟缓存
|
|
|
|
return result
|
|
|
|
async def query_klines_batch(
|
|
self,
|
|
symbols: List[str],
|
|
freq: Frequency,
|
|
start: datetime,
|
|
end: datetime,
|
|
use_cache: bool = True
|
|
) -> Dict[str, FuturesKLineData]:
|
|
"""
|
|
批量查询期货 K 线数据
|
|
|
|
Args:
|
|
symbols: 合约代码列表 (最多100个)
|
|
freq: K 线周期
|
|
start: 开始时间
|
|
end: 结束时间
|
|
use_cache: 是否使用缓存
|
|
|
|
Returns:
|
|
Dict[str, FuturesKLineData]: 合约代码 -> K线数据映射
|
|
"""
|
|
if len(symbols) > 100:
|
|
raise ValueError("批量查询最多支持 100 个合约")
|
|
|
|
results = {}
|
|
for symbol in symbols:
|
|
try:
|
|
results[symbol] = await self.query_klines(
|
|
symbol, freq, start, end, use_cache
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"查询 {symbol} 失败: {e}")
|
|
results[symbol] = FuturesKLineData(
|
|
symbol=symbol,
|
|
name="",
|
|
freq=freq,
|
|
count=0,
|
|
items=[],
|
|
error=str(e)
|
|
)
|
|
|
|
return results
|
|
|
|
async def get_main_contract_klines(
|
|
self,
|
|
product_code: str,
|
|
freq: Frequency,
|
|
start: datetime,
|
|
end: datetime
|
|
) -> Optional[FuturesKLineData]:
|
|
"""
|
|
获取主力合约 K 线数据
|
|
|
|
Args:
|
|
product_code: 品种代码 (如 IF, IC, AG)
|
|
freq: K 线周期
|
|
start: 开始时间
|
|
end: 结束时间
|
|
|
|
Returns:
|
|
Optional[FuturesKLineData]: 主力合约 K 线数据
|
|
"""
|
|
# 获取主力合约
|
|
main_contract = self.repository.get_main_contract(product_code)
|
|
if not main_contract:
|
|
logger.warning(f"品种 {product_code} 无主力合约")
|
|
return None
|
|
|
|
return await self.query_klines(main_contract, freq, start, end)
|
|
|
|
def get_active_contracts(
|
|
self,
|
|
product_code: str,
|
|
limit: int = 10
|
|
) -> List[FuturesContractInfo]:
|
|
"""
|
|
获取活跃合约列表
|
|
|
|
Args:
|
|
product_code: 品种代码
|
|
limit: 返回数量
|
|
|
|
Returns:
|
|
List[FuturesContractInfo]: 活跃合约列表
|
|
"""
|
|
return self.repository.get_active_contracts(product_code, limit)
|
|
|
|
def get_contract_info(self, symbol: str) -> Optional[FuturesContractInfo]:
|
|
"""
|
|
获取合约信息
|
|
|
|
Args:
|
|
symbol: 合约代码
|
|
|
|
Returns:
|
|
Optional[FuturesContractInfo]: 合约信息
|
|
"""
|
|
return self.repository.get_contract_info(symbol)
|
|
|
|
def _validate_params(
|
|
self,
|
|
symbol: str,
|
|
freq: Frequency,
|
|
start: datetime,
|
|
end: datetime
|
|
) -> None:
|
|
"""验证查询参数"""
|
|
if not symbol:
|
|
raise ValueError("合约代码不能为空")
|
|
|
|
if start >= end:
|
|
raise ValueError("开始时间必须早于结束时间")
|
|
|
|
# 时间范围限制
|
|
max_days = {
|
|
Frequency.FREQ_1M: 30,
|
|
Frequency.FREQ_5M: 60,
|
|
Frequency.FREQ_15M: 90,
|
|
Frequency.FREQ_30M: 180,
|
|
Frequency.FREQ_1H: 365,
|
|
Frequency.FREQ_1D: 3650, # 10年
|
|
Frequency.FREQ_1W: 3650,
|
|
Frequency.FREQ_1MONTH: 3650,
|
|
}
|
|
|
|
days = (end - start).days
|
|
max_allowed = max_days.get(freq, 365)
|
|
if days > max_allowed:
|
|
raise ValueError(f"{freq.value} 周期最多查询 {max_allowed} 天数据")
|
|
|
|
async def _fetch_from_adapter(
|
|
self,
|
|
symbol: str,
|
|
freq: Frequency,
|
|
start: datetime,
|
|
end: datetime
|
|
) -> List[FuturesKLineItem]:
|
|
"""
|
|
从数据源适配器获取 K 线数据
|
|
|
|
注意: 此方法修复了原代码中的异步问题
|
|
原代码错误地使用 asyncio.new_event_loop()
|
|
"""
|
|
try:
|
|
from app.services.amazing_data_service import AmazingDataService
|
|
|
|
# 获取服务实例
|
|
service = AmazingDataService()
|
|
|
|
# 转换频率格式
|
|
period_map = {
|
|
Frequency.FREQ_1M: "min1",
|
|
Frequency.FREQ_5M: "min5",
|
|
Frequency.FREQ_15M: "min15",
|
|
Frequency.FREQ_30M: "min30",
|
|
Frequency.FREQ_1H: "min60",
|
|
Frequency.FREQ_1D: "day",
|
|
Frequency.FREQ_1W: "week",
|
|
Frequency.FREQ_1MONTH: "month",
|
|
}
|
|
period = period_map.get(freq, "day")
|
|
|
|
# 使用上下文管理器确保连接正确关闭
|
|
async with service.get_connection() as conn:
|
|
# 调用适配器获取数据
|
|
df = await conn.get_kline(
|
|
symbol=symbol,
|
|
period=period,
|
|
start_time=start.strftime("%Y%m%d%H%M%S"),
|
|
end_time=end.strftime("%Y%m%d%H%M%S")
|
|
)
|
|
|
|
if df is None or df.empty:
|
|
logger.warning(f"适配器返回空数据: {symbol}")
|
|
return []
|
|
|
|
# 转换为 KLineItem 列表
|
|
items = []
|
|
for _, row in df.iterrows():
|
|
items.append(FuturesKLineItem(
|
|
symbol=symbol,
|
|
time=row.get('time', row.name) if isinstance(row.get('time'), datetime)
|
|
else datetime.strptime(str(row.get('time', row.name)), "%Y-%m-%d %H:%M:%S"),
|
|
open=float(row.get('open', 0)),
|
|
high=float(row.get('high', 0)),
|
|
low=float(row.get('low', 0)),
|
|
close=float(row.get('close', 0)),
|
|
volume=int(row.get('volume', 0)),
|
|
open_interest=int(row.get('open_interest', 0)) if 'open_interest' in row else 0,
|
|
settlement_price=float(row.get('settlement', 0)) if 'settlement' in row else None,
|
|
trade_date=row.get('trade_date', row.get('time', row.name).date() if hasattr(row.get('time', row.name), 'date') else None)
|
|
))
|
|
|
|
logger.info(f"从适配器获取 {symbol} {freq.value} 数据 {len(items)} 条")
|
|
return items
|
|
|
|
except ImportError:
|
|
logger.warning("AmazingDataService 未安装,无法从适配器获取数据")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"从适配器获取数据失败: {e}")
|
|
return []
|
|
|
|
def _save_klines_to_db(
|
|
self,
|
|
symbol: str,
|
|
freq: Frequency,
|
|
items: List[FuturesKLineItem]
|
|
) -> int:
|
|
"""
|
|
保存 K 线数据到数据库
|
|
|
|
Args:
|
|
symbol: 合约代码
|
|
freq: K 线周期
|
|
items: K 线数据列表
|
|
|
|
Returns:
|
|
int: 保存的数量
|
|
"""
|
|
try:
|
|
count = self.repository.save_klines(symbol, freq, items)
|
|
logger.info(f"保存 {symbol} {freq.value} 数据 {count} 条")
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"保存数据失败: {e}")
|
|
return 0
|
|
|
|
async def sync_klines(
|
|
self,
|
|
symbol: str,
|
|
freq: Frequency,
|
|
days: int = 30
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
同步 K 线数据
|
|
|
|
Args:
|
|
symbol: 合约代码
|
|
freq: K 线周期
|
|
days: 同步天数
|
|
|
|
Returns:
|
|
Dict: 同步结果
|
|
"""
|
|
end = datetime.now()
|
|
start = end - timedelta(days=days)
|
|
|
|
# 获取最新时间戳
|
|
latest = self.repository.get_latest_timestamp(symbol, freq)
|
|
if latest and latest > start:
|
|
start = latest + timedelta(seconds=1)
|
|
|
|
if start >= end:
|
|
return {
|
|
"symbol": symbol,
|
|
"freq": freq.value,
|
|
"status": "already_synced",
|
|
"count": 0
|
|
}
|
|
|
|
# 从适配器获取数据
|
|
items = await self._fetch_from_adapter(symbol, freq, start, end)
|
|
|
|
# 保存到数据库
|
|
count = self._save_klines_to_db(symbol, freq, items)
|
|
|
|
return {
|
|
"symbol": symbol,
|
|
"freq": freq.value,
|
|
"status": "synced",
|
|
"count": count,
|
|
"start": start.isoformat(),
|
|
"end": end.isoformat()
|
|
}
|
|
|
|
|
|
# 服务工厂函数
|
|
def get_futures_kline_service(db: Session) -> FuturesKLineService:
|
|
"""获取期货 K 线服务实例"""
|
|
return FuturesKLineService(db) |