""" 期货数据服务 """ from typing import List, Dict from datetime import date from sqlalchemy.orm import Session from sqlalchemy import and_ import pandas as pd import logging from app.models.future import FutureKlineDaily, FutureKlineMin from app.services.sdk_manager import sdk_manager from app.services.base_data_service import BaseDataService from app.utils.date_utils import parse_date, format_date logger = logging.getLogger(__name__) class FutureService: """期货数据服务""" def __init__(self, db: Session): self.db = db self.base_service = BaseDataService(db) def _get_adapter(self): """获取SDK适配器(使用连接管理器)""" return sdk_manager.get_default_connection() def get_kline( self, codes: List[str], start_date: date, end_date: date, period: str = "daily" ) -> Dict[str, List[dict]]: """ 获取期货K线数据(带缓存) Args: codes: 代码列表 start_date: 开始日期 end_date: 结束日期 period: 周期 (daily, min1, min5, min15, min30, min60) Returns: 字典 {code: [kline_data]} """ result = {} for code in codes: try: if period == "daily": data = self._get_daily_kline_with_cache(code, start_date, end_date) else: data = self._get_min_kline_with_cache(code, start_date, end_date, period) result[code] = data except Exception as e: logger.error(f"获取{code}的K线数据失败: {str(e)}") result[code] = [] return result def _get_daily_kline_with_cache( self, code: str, start_date: date, end_date: date ) -> List[dict]: """获取期货日线数据(带缓存)""" # 1. 查询本地缓存 cached_records = self.db.query(FutureKlineDaily).filter( and_( FutureKlineDaily.code == code, FutureKlineDaily.trade_date >= start_date, FutureKlineDaily.trade_date <= end_date ) ).order_by(FutureKlineDaily.trade_date).all() # 2. 检查数据完整性 cached_dates = {r.trade_date for r in cached_records} expected_dates = set(self.base_service.get_trading_calendar("CFE", start_date, end_date)) missing_dates = expected_dates - cached_dates # 3. 如果有缺失,从SDK获取 if missing_dates: try: adapter = self._get_adapter() if adapter: sdk_data = adapter.get_kline([code], start_date, end_date, "daily") if code in sdk_data and sdk_data[code] is not None and not sdk_data[code].empty: self._save_daily_kline(code, sdk_data[code]) cached_records = self.db.query(FutureKlineDaily).filter( and_( FutureKlineDaily.code == code, FutureKlineDaily.trade_date >= start_date, FutureKlineDaily.trade_date <= end_date ) ).order_by(FutureKlineDaily.trade_date).all() except Exception as e: logger.error(f"从SDK获取{code}数据失败: {str(e)}") return [ { "trade_date": format_date(r.trade_date), "open": float(r.open), "high": float(r.high), "low": float(r.low), "close": float(r.close), "volume": int(r.volume), "amount": float(r.amount), "settle": float(r.settle) if r.settle else None, "open_interest": int(r.open_interest) if r.open_interest else None } for r in cached_records ] def _get_min_kline_with_cache( self, code: str, start_date: date, end_date: date, period: str ) -> List[dict]: """获取期货分钟线数据(带缓存)""" from datetime import datetime start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) cached_records = self.db.query(FutureKlineMin).filter( and_( FutureKlineMin.code == code, FutureKlineMin.period_type == period, FutureKlineMin.trade_datetime >= start_datetime, FutureKlineMin.trade_datetime <= end_datetime ) ).order_by(FutureKlineMin.trade_datetime).all() if len(cached_records) < 10: try: adapter = self._get_adapter() if adapter: sdk_data = adapter.get_kline([code], start_date, end_date, period) if code in sdk_data and sdk_data[code] is not None and not sdk_data[code].empty: self._save_min_kline(code, sdk_data[code], period) cached_records = self.db.query(FutureKlineMin).filter( and_( FutureKlineMin.code == code, FutureKlineMin.period_type == period, FutureKlineMin.trade_datetime >= start_datetime, FutureKlineMin.trade_datetime <= end_datetime ) ).order_by(FutureKlineMin.trade_datetime).all() except Exception as e: logger.error(f"从SDK获取{code}分钟数据失败: {str(e)}") return [ { "trade_datetime": r.trade_datetime.isoformat(), "open": float(r.open), "high": float(r.high), "low": float(r.low), "close": float(r.close), "volume": int(r.volume), "amount": float(r.amount), "settle": float(r.settle) if r.settle else None, "open_interest": int(r.open_interest) if r.open_interest else None } for r in cached_records ] def _save_daily_kline(self, code: str, df: pd.DataFrame): """保存期货日线数据到数据库""" if df.empty: return for idx, row in df.iterrows(): kline_time = row.get("kline_time") if kline_time is None: continue trade_date = kline_time.date() if hasattr(kline_time, 'date') else parse_date(str(kline_time)[:10]) existing = self.db.query(FutureKlineDaily).filter( and_( FutureKlineDaily.code == code, FutureKlineDaily.trade_date == trade_date ) ).first() if existing: existing.open = float(row.get("open", 0)) existing.high = float(row.get("high", 0)) existing.low = float(row.get("low", 0)) existing.close = float(row.get("close", 0)) existing.volume = int(row.get("volume", 0)) existing.amount = float(row.get("amount", 0)) existing.settle = float(row.get("settle")) if pd.notna(row.get("settle")) else None existing.open_interest = int(row.get("open_interest")) if pd.notna(row.get("open_interest")) else None else: record = FutureKlineDaily( code=code, trade_date=trade_date, open=float(row.get("open", 0)), high=float(row.get("high", 0)), low=float(row.get("low", 0)), close=float(row.get("close", 0)), volume=int(row.get("volume", 0)), amount=float(row.get("amount", 0)), settle=float(row.get("settle")) if pd.notna(row.get("settle")) else None, open_interest=int(row.get("open_interest")) if pd.notna(row.get("open_interest")) else None ) self.db.add(record) self.db.commit() def _save_min_kline(self, code: str, df: pd.DataFrame, period: str): """保存期货分钟线数据到数据库""" if df.empty: return from datetime import datetime for idx, row in df.iterrows(): kline_time = row.get("kline_time") if kline_time is None: continue trade_datetime = kline_time if isinstance(kline_time, datetime) else datetime.fromisoformat(str(kline_time)) existing = self.db.query(FutureKlineMin).filter( and_( FutureKlineMin.code == code, FutureKlineMin.period_type == period, FutureKlineMin.trade_datetime == trade_datetime ) ).first() if not existing: record = FutureKlineMin( code=code, period_type=period, trade_datetime=trade_datetime, open=float(row.get("open", 0)), high=float(row.get("high", 0)), low=float(row.get("low", 0)), close=float(row.get("close", 0)), volume=int(row.get("volume", 0)), amount=float(row.get("amount", 0)), settle=float(row.get("settle")) if pd.notna(row.get("settle")) else None, open_interest=int(row.get("open_interest")) if pd.notna(row.get("open_interest")) else None ) self.db.add(record) self.db.commit() def get_kline_chart_data( self, code: str, start_date: date, end_date: date, period: str = "daily" ) -> dict: """获取K线图数据(ECharts格式)""" kline_data = self.get_kline([code], start_date, end_date, period) data = kline_data.get(code, []) if not data: return { "categoryData": [], "values": [], "volumes": [] } category_data = [] values = [] volumes = [] for i, item in enumerate(data): date_key = item.get("trade_date") or item.get("trade_datetime", "")[:10] category_data.append(date_key) values.append([ item["open"], item["close"], item["low"], item["high"], item["volume"] ]) sign = 1 if item["close"] >= item["open"] else -1 volumes.append([i, item["volume"], sign]) return { "categoryData": category_data, "values": values, "volumes": volumes } def get_cache_status(self, code: str, period: str = "daily") -> dict: """获取代码缓存状态""" if period == "daily": query = self.db.query(FutureKlineDaily).filter(FutureKlineDaily.code == code) count = query.count() min_date = query.order_by(FutureKlineDaily.trade_date).first() max_date = query.order_by(FutureKlineDaily.trade_date.desc()).first() return { "code": code, "security_type": "future", "period_type": period, "record_count": count, "min_date": format_date(min_date.trade_date) if min_date else None, "max_date": format_date(max_date.trade_date) if max_date else None } else: query = self.db.query(FutureKlineMin).filter( FutureKlineMin.code == code, FutureKlineMin.period_type == period ) count = query.count() return { "code": code, "security_type": "future", "period_type": period, "record_count": count, "min_date": None, "max_date": None }