You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

391 lines
12 KiB

"""
期货 K 线服务 v2.2
支持多周期、持仓量、结算价等期货特有数据
"""
import logging
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from enum import Enum
from sqlalchemy.orm import Session
from app.models.kline import (
Frequency, FuturesKLineItem, FuturesKLineData,
FuturesSymbolInfo, FuturesContractInfo, FuturesKLineQuery
)
from app.repositories.kline.futures_repository import FuturesKLineRepository
from app.services.cache_service import cache_service
logger = logging.getLogger(__name__)
# 支持的期货 K 线周期
FUTURES_FREQUENCIES = [
Frequency.FREQ_1M,
Frequency.FREQ_5M,
Frequency.FREQ_15M,
Frequency.FREQ_30M,
Frequency.FREQ_1H,
Frequency.FREQ_1D,
Frequency.FREQ_1W,
Frequency.FREQ_1MONTH,
]
class FuturesKLineService:
"""期货 K 线服务"""
def __init__(self, db: Session):
self.repository = FuturesKLineRepository(db)
self.db = db
async def query_klines(
self,
symbol: str,
freq: Frequency,
start: datetime,
end: datetime,
use_cache: bool = True
) -> FuturesKLineData:
"""
查询期货 K 线数据
Args:
symbol: 合约代码 (如 IF2406, AG2605.SHF)
freq: K 线周期
start: 开始时间
end: 结束时间
use_cache: 是否使用缓存
Returns:
FuturesKLineData: K 线数据响应
Raises:
ValueError: 参数验证失败
"""
# 参数验证
self._validate_params(symbol, freq, start, end)
# 验证周期是否支持
if freq not in FUTURES_FREQUENCIES:
raise ValueError(f"不支持的期货 K 线周期: {freq}")
# 尝试从缓存获取
cache_key = None
if use_cache:
cache_key = f"futures_kline:{symbol}:{freq.value}:{start.strftime('%Y%m%d')}:{end.strftime('%Y%m%d')}"
cached = await cache_service.get(cache_key)
if cached:
logger.info(f"缓存命中: {cache_key}")
return FuturesKLineData(**cached)
# 查询数据库
items = self.repository.get_klines(symbol, freq, start, end)
# 如果没有数据,尝试从适配器获取
if not items:
logger.info(f"数据库无 {symbol} 数据,尝试从数据源获取")
items = await self._fetch_from_adapter(symbol, freq, start, end)
# 保存到数据库
if items:
self._save_klines_to_db(symbol, freq, items)
# 获取品种信息
symbol_info = self.repository.get_symbol_info(symbol)
result = FuturesKLineData(
symbol=symbol,
name=symbol_info.name if symbol_info else "",
freq=freq,
count=len(items),
items=items
)
# 写入缓存
if cache_key and items:
await cache_service.set(cache_key, result.model_dump(), expire=300) # 5分钟缓存
return result
async def query_klines_batch(
self,
symbols: List[str],
freq: Frequency,
start: datetime,
end: datetime,
use_cache: bool = True
) -> Dict[str, FuturesKLineData]:
"""
批量查询期货 K 线数据
Args:
symbols: 合约代码列表 (最多100个)
freq: K 线周期
start: 开始时间
end: 结束时间
use_cache: 是否使用缓存
Returns:
Dict[str, FuturesKLineData]: 合约代码 -> K线数据映射
"""
if len(symbols) > 100:
raise ValueError("批量查询最多支持 100 个合约")
results = {}
for symbol in symbols:
try:
results[symbol] = await self.query_klines(
symbol, freq, start, end, use_cache
)
except Exception as e:
logger.error(f"查询 {symbol} 失败: {e}")
results[symbol] = FuturesKLineData(
symbol=symbol,
name="",
freq=freq,
count=0,
items=[],
error=str(e)
)
return results
async def get_main_contract_klines(
self,
product_code: str,
freq: Frequency,
start: datetime,
end: datetime
) -> Optional[FuturesKLineData]:
"""
获取主力合约 K 线数据
Args:
product_code: 品种代码 (如 IF, IC, AG)
freq: K 线周期
start: 开始时间
end: 结束时间
Returns:
Optional[FuturesKLineData]: 主力合约 K 线数据
"""
# 获取主力合约
main_contract = self.repository.get_main_contract(product_code)
if not main_contract:
logger.warning(f"品种 {product_code} 无主力合约")
return None
return await self.query_klines(main_contract, freq, start, end)
def get_active_contracts(
self,
product_code: str,
limit: int = 10
) -> List[FuturesContractInfo]:
"""
获取活跃合约列表
Args:
product_code: 品种代码
limit: 返回数量
Returns:
List[FuturesContractInfo]: 活跃合约列表
"""
return self.repository.get_active_contracts(product_code, limit)
def get_contract_info(self, symbol: str) -> Optional[FuturesContractInfo]:
"""
获取合约信息
Args:
symbol: 合约代码
Returns:
Optional[FuturesContractInfo]: 合约信息
"""
return self.repository.get_contract_info(symbol)
def _validate_params(
self,
symbol: str,
freq: Frequency,
start: datetime,
end: datetime
) -> None:
"""验证查询参数"""
if not symbol:
raise ValueError("合约代码不能为空")
if start >= end:
raise ValueError("开始时间必须早于结束时间")
# 时间范围限制
max_days = {
Frequency.FREQ_1M: 30,
Frequency.FREQ_5M: 60,
Frequency.FREQ_15M: 90,
Frequency.FREQ_30M: 180,
Frequency.FREQ_1H: 365,
Frequency.FREQ_1D: 3650, # 10年
Frequency.FREQ_1W: 3650,
Frequency.FREQ_1MONTH: 3650,
}
days = (end - start).days
max_allowed = max_days.get(freq, 365)
if days > max_allowed:
raise ValueError(f"{freq.value} 周期最多查询 {max_allowed} 天数据")
async def _fetch_from_adapter(
self,
symbol: str,
freq: Frequency,
start: datetime,
end: datetime
) -> List[FuturesKLineItem]:
"""
从数据源适配器获取 K 线数据
注意: 此方法修复了原代码中的异步问题
原代码错误地使用 asyncio.new_event_loop()
"""
try:
from app.services.amazing_data_service import AmazingDataService
# 获取服务实例
service = AmazingDataService()
# 转换频率格式
period_map = {
Frequency.FREQ_1M: "min1",
Frequency.FREQ_5M: "min5",
Frequency.FREQ_15M: "min15",
Frequency.FREQ_30M: "min30",
Frequency.FREQ_1H: "min60",
Frequency.FREQ_1D: "day",
Frequency.FREQ_1W: "week",
Frequency.FREQ_1MONTH: "month",
}
period = period_map.get(freq, "day")
# 使用上下文管理器确保连接正确关闭
async with service.get_connection() as conn:
# 调用适配器获取数据
df = await conn.get_kline(
symbol=symbol,
period=period,
start_time=start.strftime("%Y%m%d%H%M%S"),
end_time=end.strftime("%Y%m%d%H%M%S")
)
if df is None or df.empty:
logger.warning(f"适配器返回空数据: {symbol}")
return []
# 转换为 KLineItem 列表
items = []
for _, row in df.iterrows():
items.append(FuturesKLineItem(
symbol=symbol,
time=row.get('time', row.name) if isinstance(row.get('time'), datetime)
else datetime.strptime(str(row.get('time', row.name)), "%Y-%m-%d %H:%M:%S"),
open=float(row.get('open', 0)),
high=float(row.get('high', 0)),
low=float(row.get('low', 0)),
close=float(row.get('close', 0)),
volume=int(row.get('volume', 0)),
open_interest=int(row.get('open_interest', 0)) if 'open_interest' in row else 0,
settlement_price=float(row.get('settlement', 0)) if 'settlement' in row else None,
trade_date=row.get('trade_date', row.get('time', row.name).date() if hasattr(row.get('time', row.name), 'date') else None)
))
logger.info(f"从适配器获取 {symbol} {freq.value} 数据 {len(items)}")
return items
except ImportError:
logger.warning("AmazingDataService 未安装,无法从适配器获取数据")
return []
except Exception as e:
logger.error(f"从适配器获取数据失败: {e}")
return []
def _save_klines_to_db(
self,
symbol: str,
freq: Frequency,
items: List[FuturesKLineItem]
) -> int:
"""
保存 K 线数据到数据库
Args:
symbol: 合约代码
freq: K 线周期
items: K 线数据列表
Returns:
int: 保存的数量
"""
try:
count = self.repository.save_klines(symbol, freq, items)
logger.info(f"保存 {symbol} {freq.value} 数据 {count}")
return count
except Exception as e:
logger.error(f"保存数据失败: {e}")
return 0
async def sync_klines(
self,
symbol: str,
freq: Frequency,
days: int = 30
) -> Dict[str, Any]:
"""
同步 K 线数据
Args:
symbol: 合约代码
freq: K 线周期
days: 同步天数
Returns:
Dict: 同步结果
"""
end = datetime.now()
start = end - timedelta(days=days)
# 获取最新时间戳
latest = self.repository.get_latest_timestamp(symbol, freq)
if latest and latest > start:
start = latest + timedelta(seconds=1)
if start >= end:
return {
"symbol": symbol,
"freq": freq.value,
"status": "already_synced",
"count": 0
}
# 从适配器获取数据
items = await self._fetch_from_adapter(symbol, freq, start, end)
# 保存到数据库
count = self._save_klines_to_db(symbol, freq, items)
return {
"symbol": symbol,
"freq": freq.value,
"status": "synced",
"count": count,
"start": start.isoformat(),
"end": end.isoformat()
}
# 服务工厂函数
def get_futures_kline_service(db: Session) -> FuturesKLineService:
"""获取期货 K 线服务实例"""
return FuturesKLineService(db)