""" 期货智析接口 - 提供期货分析数据 """ import json import logging from pathlib import Path from typing import Optional from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from app.database import get_db from app.services.cache import get_cached_data, get_latest_cached logger = logging.getLogger(__name__) router = APIRouter(prefix="/futures", tags=["期货智析"]) CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config" SYMBOLS_CONFIG_FILE = CONFIG_DIR / "symbols_config.json" def _load_symbols_config() -> dict: """加载品种配置文件""" if not SYMBOLS_CONFIG_FILE.exists(): return {"futures": {}, "stock": {}} with open(SYMBOLS_CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) @router.get("/list") def get_futures_list(db: Session = Depends(get_db)): """获取所有期货品种列表及摘要信息(从symbols_config.json读取)""" config = _load_symbols_config() futures_config = config.get("futures", {}) if not futures_config: return {"success": True, "data": []} futures_data = [] for name, symbol_code in futures_config.items(): cached = get_cached_data(db, symbol_code, "futures") if cached and cached.get("timeframes"): all_candles = [] for period, candles in cached.get("timeframes", {}).items(): all_candles.extend(candles) if all_candles: latest_candle = all_candles[-1] open_price = float(latest_candle.get("open", 0)) close_price = float(latest_candle.get("close", 0)) high_price = float(latest_candle.get("high", 0)) low_price = float(latest_candle.get("low", 0)) change = close_price - open_price change_pct = (change / open_price * 100) if open_price > 0 else 0 futures_data.append({ "symbol": symbol_code, "name": name, "price": close_price, "change": round(change, 2), "changePct": round(change_pct, 2), "suggestion": _get_suggestion(close_price, open_price, change_pct), "suggestionType": "up" if change >= 0 else "down", "periods": _get_period_trends(all_candles), "successRate": _calc_success_rate(all_candles), "trendScore": _calc_trend_score(all_candles), "resistance": round(high_price * 1.02, 2), "support": round(low_price * 0.98, 2), "open": open_price, "high": high_price, "low": low_price, "volume": sum(float(c.get("volume", 0)) for c in all_candles) }) else: futures_data.append({ "symbol": symbol_code, "name": name, "price": 0, "change": 0, "changePct": 0, "suggestion": "等待数据", "suggestionType": "neutral", "periods": {"5": "neutral", "15": "neutral", "30": "neutral", "60": "neutral"}, "successRate": 0, "trendScore": 0, "resistance": 0, "support": 0, "open": 0, "high": 0, "low": 0, "volume": 0 }) return {"success": True, "data": futures_data} @router.get("/detail/{symbol}") def get_futures_detail(symbol: str, db: Session = Depends(get_db)): """获取指定期货品种的详细分析数据""" cached = get_cached_data(db, symbol, "futures") if not cached: raise HTTPException(status_code=404, detail=f"未找到 {symbol} 的缓存数据") all_candles = [] for period, candles in cached.get("timeframes", {}).items(): all_candles.extend(candles) if not all_candles: raise HTTPException(status_code=404, detail=f"未找到 {symbol} 的K线数据") latest_candle = all_candles[-1] open_price = float(latest_candle.get("open", 0)) close_price = float(latest_candle.get("close", 0)) high_price = float(latest_candle.get("high", 0)) low_price = float(latest_candle.get("low", 0)) change = close_price - open_price change_pct = (change / open_price * 100) if open_price > 0 else 0 resistance1 = round(high_price * 1.01, 2) resistance2 = round(high_price * 1.03, 2) resistance3 = round(high_price * 1.05, 2) support1 = round(low_price * 0.99, 2) support2 = round(low_price * 0.97, 2) support3 = round(low_price * 0.95, 2) suggestion = _get_suggestion(close_price, open_price, change_pct) suggestion_type = "up" if change >= 0 else "down" trend_score = _calc_trend_score(all_candles) data = { "symbol": symbol, "name": _get_futures_name(symbol), "price": close_price, "change": round(change, 2), "changePct": round(change_pct, 2), "suggestion": suggestion, "suggestionType": suggestion_type, "suggestionReason": _get_suggestion_reason(symbol, suggestion), "open": open_price, "high": high_price, "low": low_price, "volume": sum(float(c.get("volume", 0)) for c in all_candles), "entryPrice": round(close_price * 0.995, 2) if change >= 0 else round(close_price * 1.005, 2), "targetPrice": resistance1 if change >= 0 else support1, "stopLoss": support1 if change >= 0 else resistance1, "riskLevel": "低" if trend_score >= 80 else "中" if trend_score >= 60 else "高", "macd": _calc_macd(all_candles), "rsi": _calc_rsi(all_candles), "boll": _calc_boll(all_candles), "kdj": _calc_kdj(all_candles), "resistances": [resistance1, resistance2, resistance3], "supports": [support1, support2, support3], "periodConsistency": _get_period_trends(all_candles) } return {"success": True, "data": data} @router.get("/kline/{symbol}") def get_kline_data(symbol: str, period: str = "15", db: Session = Depends(get_db)): """获取指定品种和周期的K线数据""" period_map = { "5": "5min", "15": "15min", "30": "30min", "60": "60min", "1440": "daily", "daily": "daily" } db_period = period_map.get(period, f"{period}min") cached = get_cached_data(db, symbol, "futures", [db_period]) if not cached or not cached.get("timeframes"): raise HTTPException(status_code=404, detail=f"未找到 {symbol} {db_period} 的缓存数据") candles = cached["timeframes"].get(db_period, []) kline_data = [] for c in candles: time_str = c.get("datetime", c.get("time", "")) if time_str and len(time_str) >= 16: time_str = time_str[:16].replace("T", " ") kline_data.append([ time_str, str(c.get("open", 0)), str(c.get("close", 0)), str(c.get("low", 0)), str(c.get("high", 0)), str(int(c.get("volume", 0))) ]) return {"success": True, "data": kline_data} def _get_futures_name(symbol: str) -> str: """根据合约代码获取品种名称""" name_map = { "AU": "黄金", "AG": "白银", "CU": "铜", "AL": "铝", "ZN": "锌", "NI": "镍", "SN": "锡", "PB": "铅", "RB": "螺纹钢", "HC": "热卷", "I": "铁矿石", "J": "焦炭", "JM": "焦煤", "ZC": "动力煤", "MA": "甲醇", "TA": "PTA", "EG": "乙二醇", "PP": "聚丙烯", "L": "塑料", "V": "PVC", "M": "豆粕", "RM": "菜粕", "C": "玉米", "CS": "淀粉", "A": "豆一", "B": "豆二", "Y": "豆油", "P": "棕榈油", "OI": "菜油", "CF": "棉花", "SR": "白糖", "AP": "苹果", "JD": "鸡蛋", "LH": "生猪", "FU": "燃料油", "LU": "低硫燃油", "SC": "原油", "EC": "集运指数", "BU": "沥青", "RU": "橡胶", "NR": "20号胶", "SP": "纸浆", "SS": "不锈钢", "SA": "纯碱", "FG": "玻璃", "UR": "尿素", "SF": "硅铁", "SM": "锰硅", "IF": "沪深300", "IC": "中证500", "IH": "上证50", "IM": "中证1000", "T": "10年期国债", "TF": "5年期国债", "TS": "2年期国债", "TL": "30年期国债", } return name_map.get(symbol, symbol) def _get_suggestion(close: float, open: float, change_pct: float) -> str: """根据价格走势给出操作建议""" if change_pct > 2: return "逢低做多" elif change_pct > 0.5: return "逢低做多" elif change_pct > -0.5: return "观望等待" elif change_pct > -2: return "逢高做空" else: return "逢高做空" def _get_suggestion_reason(symbol: str, suggestion: str) -> str: """获取建议理由""" reasons = { "逢低做多": "技术面突破,趋势明确,建议逢低介入", "逢高做空": "技术面走弱,下行压力增大", "观望等待": "多空力量均衡,等待方向明确" } return reasons.get(suggestion, "等待进一步信号") def _get_period_trends(candles: list) -> dict: """计算各周期趋势 - 根据不同周期取不同长度的K线计算""" period_config = { "5": {"bars": 10, "threshold": 0.003}, "15": {"bars": 15, "threshold": 0.005}, "30": {"bars": 20, "threshold": 0.008}, "60": {"bars": 30, "threshold": 0.01} } result = {} for period, cfg in period_config.items(): bars = cfg["bars"] threshold = cfg["threshold"] if len(candles) < bars: result[period] = "neutral" continue recent = candles[-bars:] first_close = float(recent[0].get("close", 0)) last_close = float(recent[-1].get("close", 0)) if first_close <= 0: result[period] = "neutral" continue change_pct = (last_close - first_close) / first_close if change_pct > threshold: result[period] = "up" elif change_pct < -threshold: result[period] = "down" else: result[period] = "neutral" return result def _calc_success_rate(candles: list) -> int: """计算交易成功率(简化版)""" if len(candles) < 10: return 50 wins = 0 for i in range(1, len(candles)): prev_close = float(candles[i-1].get("close", 0)) curr_close = float(candles[i].get("close", 0)) if curr_close >= prev_close: wins += 1 return int(wins / (len(candles) - 1) * 100) def _calc_trend_score(candles: list) -> int: """计算趋势评分(0-100)""" if len(candles) < 5: return 50 recent = candles[-10:] closes = [float(c.get("close", 0)) for c in recent] if len(closes) < 2: return 50 up_count = sum(1 for i in range(1, len(closes)) if closes[i] >= closes[i-1]) score = int(up_count / (len(closes) - 1) * 100) return max(0, min(100, score)) def _calc_ema(data: list, period: int) -> list: """计算EMA,返回与输入等长的列表,前面用None填充""" ema = [None] * len(data) multiplier = 2 / (period + 1) if len(data) < period: return ema ema[period - 1] = sum(data[:period]) / period for i in range(period, len(data)): ema[i] = (data[i] - ema[i-1]) * multiplier + ema[i-1] return ema def _calc_macd(candles: list) -> dict: """计算MACD指标""" if len(candles) < 26: return {"signal": "中性", "detail": "数据不足"} closes = [float(c.get("close", 0)) for c in candles] ema12 = _calc_ema(closes, 12) ema26 = _calc_ema(closes, 26) dif_list = [] for i in range(len(closes)): if ema12[i] is not None and ema26[i] is not None: dif_list.append(ema12[i] - ema26[i]) else: dif_list.append(None) # 只对有效DIF值计算DEA,避免None替换为0导致计算错误 dif_valid = [d for d in dif_list if d is not None] if dif_valid: dea_valid = _calc_ema(dif_valid, 9) dea_list = [None] * (len(dif_list) - len(dif_valid)) + dea_valid else: dea_list = [None] * len(dif_list) dif = dif_list[-1] dea = dea_list[-1] if dif is not None and dea is not None: if dif > dea: signal = "金叉" elif dif < dea: signal = "死叉" else: signal = "中性" else: signal = "中性" return {"signal": signal, "detail": f"DIF: {dif:.4f}"} def _calc_rsi(candles: list) -> dict: """计算RSI指标""" if len(candles) < 15: return {"value": 50, "status": "正常"} closes = [float(c.get("close", 0)) for c in candles[-15:]] gains = [] losses = [] for i in range(1, len(closes)): diff = closes[i] - closes[i-1] gains.append(max(0, diff)) losses.append(max(0, -diff)) avg_gain = sum(gains) / len(gains) if gains else 0 avg_loss = sum(losses) / len(losses) if losses else 0 if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) rsi = int(rsi) if rsi > 70: status = "超买" elif rsi < 30: status = "超卖" else: status = "正常" return {"value": rsi, "status": status} def _calc_boll(candles: list) -> dict: """计算布林带""" if len(candles) < 20: return {"signal": "中轨", "detail": "区间: --"} closes = [float(c.get("close", 0)) for c in candles[-20:]] ma = sum(closes) / len(closes) std = (sum((c - ma) ** 2 for c in closes) / len(closes)) ** 0.5 upper = ma + 2 * std lower = ma - 2 * std current = closes[-1] if current > upper: signal = "上轨外" elif current < lower: signal = "下轨外" elif current > ma: signal = "中轨上" else: signal = "中轨" return {"signal": signal, "detail": f"区间: {lower:.0f}-{upper:.0f}"} def _calc_kdj(candles: list) -> dict: """计算KDJ指标""" if len(candles) < 9: return {"signal": "中性", "detail": "K: -- D: --"} highs = [float(c.get("high", 0)) for c in candles[-9:]] lows = [float(c.get("low", 0)) for c in candles[-9:]] closes = [float(c.get("close", 0)) for c in candles[-9:]] highest = max(highs) lowest = min(lows) current = closes[-1] if highest == lowest: rsv = 50 else: rsv = (current - lowest) / (highest - lowest) * 100 k = int(rsv * 2 / 3 + 50 / 3) d = int(k * 2 / 3 + 50 / 3) if k > d: signal = "偏多" elif k < d: signal = "偏空" else: signal = "中性" return {"signal": signal, "detail": f"K: {k} D: {d}"}