""" 期货智析接口 - 提供期货分析数据 """ import json import logging from pathlib import Path from typing import Optional import threading from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from sqlalchemy.orm import Session from app.database import get_db from app.analysis_db import get_analysis_db from app.analysis_models import FuturesAnalysis, WatchedSymbol, AIModelConfig, AnalysisSettings, AIAnalysisCache from app.services.cache import get_cached_data, get_latest_cached, save_market_data from app.services.collector import fetch_symbol_data logger = logging.getLogger(__name__) router = APIRouter(prefix="/futures", tags=["期货智析"]) CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config" SYMBOLS_CONFIG_FILE = CONFIG_DIR / "symbols_config.json" def _load_symbols_config() -> dict: """加载品种配置文件""" if not SYMBOLS_CONFIG_FILE.exists(): return {"futures": {}, "stock": {}} with open(SYMBOLS_CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) @router.get("/list") def get_futures_list(db: Session = Depends(get_db)): """获取所有期货品种列表及摘要信息(从symbols_config.json读取)""" config = _load_symbols_config() futures_config = config.get("futures", {}) if not futures_config: return {"success": True, "data": []} futures_data = [] for name, symbol_code in futures_config.items(): cached = get_cached_data(db, symbol_code, "futures") if cached and cached.get("timeframes"): all_candles = [] for period, candles in cached.get("timeframes", {}).items(): all_candles.extend(candles) if all_candles: latest_candle = all_candles[-1] open_price = float(latest_candle.get("open", 0)) close_price = float(latest_candle.get("close", 0)) high_price = float(latest_candle.get("high", 0)) low_price = float(latest_candle.get("low", 0)) change = close_price - open_price change_pct = (change / open_price * 100) if open_price > 0 else 0 futures_data.append({ "symbol": symbol_code, "name": name, "price": close_price, "change": round(change, 2), "changePct": round(change_pct, 2), "suggestion": _get_suggestion(close_price, open_price, change_pct), "suggestionType": "up" if change >= 0 else "down", "periods": _get_period_trends(all_candles), "successRate": _calc_success_rate(all_candles), "trendScore": _calc_trend_score(all_candles), "resistance": round(2 * ((high_price + low_price + close_price) / 3) - low_price, 2), "support": round(2 * ((high_price + low_price + close_price) / 3) - high_price, 2), "open": open_price, "high": high_price, "low": low_price, "volume": sum(float(c.get("volume", 0)) for c in all_candles) }) else: futures_data.append({ "symbol": symbol_code, "name": name, "price": 0, "change": 0, "changePct": 0, "suggestion": "等待数据", "suggestionType": "neutral", "periods": {"5": "neutral", "15": "neutral", "30": "neutral", "60": "neutral"}, "successRate": 0, "trendScore": 0, "resistance": 0, "support": 0, "open": 0, "high": 0, "low": 0, "volume": 0 }) return {"success": True, "data": futures_data} @router.get("/detail/{symbol}") def get_futures_detail(symbol: str, db: Session = Depends(get_db)): """获取指定期货品种的详细分析数据""" cached = get_cached_data(db, symbol, "futures") if not cached: raise HTTPException(status_code=404, detail=f"未找到 {symbol} 的缓存数据") all_candles = [] for period, candles in cached.get("timeframes", {}).items(): all_candles.extend(candles) if not all_candles: raise HTTPException(status_code=404, detail=f"未找到 {symbol} 的K线数据") latest_candle = all_candles[-1] open_price = float(latest_candle.get("open", 0)) close_price = float(latest_candle.get("close", 0)) high_price = float(latest_candle.get("high", 0)) low_price = float(latest_candle.get("low", 0)) change = close_price - open_price change_pct = (change / open_price * 100) if open_price > 0 else 0 # Pivot Point 公式计算关键点位 pp = (high_price + low_price + close_price) / 3 r1 = round(2 * pp - low_price, 2) r2 = round(pp + (high_price - low_price), 2) s1 = round(2 * pp - high_price, 2) s2 = round(pp - (high_price - low_price), 2) suggestion = _get_suggestion(close_price, open_price, change_pct) suggestion_type = "up" if change >= 0 else "down" trend_score = _calc_trend_score(all_candles) data = { "symbol": symbol, "name": _get_futures_name(symbol), "price": close_price, "change": round(change, 2), "changePct": round(change_pct, 2), "suggestion": suggestion, "suggestionType": suggestion_type, "suggestionReason": _get_suggestion_reason(symbol, suggestion), "open": open_price, "high": high_price, "low": low_price, "volume": sum(float(c.get("volume", 0)) for c in all_candles), "entryPrice": round(close_price * 0.995, 2) if change >= 0 else round(close_price * 1.005, 2), "targetPrice": r1 if change >= 0 else s1, "stopLoss": s1 if change >= 0 else r1, "riskLevel": "低" if trend_score >= 80 else "中" if trend_score >= 60 else "高", "macd": _calc_macd(all_candles), "rsi": _calc_rsi(all_candles), "boll": _calc_boll(all_candles), "kdj": _calc_kdj(all_candles), "resistances": [r1, r2], "supports": [s1, s2], "pivotPoint": round(pp, 2), "periodConsistency": _get_period_trends(all_candles) } return {"success": True, "data": data} @router.get("/kline/{symbol}") def get_kline_data(symbol: str, period: str = "15", db: Session = Depends(get_db)): """获取指定品种和周期的K线数据""" period_map = { "5": "5min", "15": "15min", "30": "30min", "60": "60min", "1440": "daily", "daily": "daily" } db_period = period_map.get(period, f"{period}min") cached = get_cached_data(db, symbol, "futures", [db_period]) if not cached or not cached.get("timeframes"): raise HTTPException(status_code=404, detail=f"未找到 {symbol} {db_period} 的缓存数据") candles = cached["timeframes"].get(db_period, []) kline_data = [] for c in candles: time_str = c.get("datetime", c.get("time", "")) if time_str and len(time_str) >= 16: time_str = time_str[:16].replace("T", " ") kline_data.append([ time_str, str(c.get("open", 0)), str(c.get("close", 0)), str(c.get("low", 0)), str(c.get("high", 0)), str(int(c.get("volume", 0))) ]) return {"success": True, "data": kline_data} def _get_futures_name(symbol: str) -> str: """根据合约代码获取品种名称""" name_map = { "AU": "黄金", "AG": "白银", "CU": "铜", "AL": "铝", "ZN": "锌", "NI": "镍", "SN": "锡", "PB": "铅", "RB": "螺纹钢", "HC": "热卷", "I": "铁矿石", "J": "焦炭", "JM": "焦煤", "ZC": "动力煤", "MA": "甲醇", "TA": "PTA", "EG": "乙二醇", "PP": "聚丙烯", "L": "塑料", "V": "PVC", "M": "豆粕", "RM": "菜粕", "C": "玉米", "CS": "淀粉", "A": "豆一", "B": "豆二", "Y": "豆油", "P": "棕榈油", "OI": "菜油", "CF": "棉花", "SR": "白糖", "AP": "苹果", "JD": "鸡蛋", "LH": "生猪", "FU": "燃料油", "LU": "低硫燃油", "SC": "原油", "EC": "集运指数", "BU": "沥青", "RU": "橡胶", "NR": "20号胶", "SP": "纸浆", "SS": "不锈钢", "SA": "纯碱", "FG": "玻璃", "UR": "尿素", "SF": "硅铁", "SM": "锰硅", "IF": "沪深300", "IC": "中证500", "IH": "上证50", "IM": "中证1000", "T": "10年期国债", "TF": "5年期国债", "TS": "2年期国债", "TL": "30年期国债", } return name_map.get(symbol, symbol) def _get_suggestion(close: float, open: float, change_pct: float) -> str: """根据价格走势给出操作建议""" if change_pct > 2: return "逢低做多" elif change_pct > 0.5: return "逢低做多" elif change_pct > -0.5: return "观望等待" elif change_pct > -2: return "逢高做空" else: return "逢高做空" def _get_suggestion_reason(symbol: str, suggestion: str) -> str: """获取建议理由""" reasons = { "逢低做多": "技术面突破,趋势明确,建议逢低介入", "逢高做空": "技术面走弱,下行压力增大", "观望等待": "多空力量均衡,等待方向明确" } return reasons.get(suggestion, "等待进一步信号") def _get_period_trends(candles: list) -> dict: """计算各周期趋势 - 根据不同周期取不同长度的K线计算""" period_config = { "5": {"bars": 10, "threshold": 0.003}, "15": {"bars": 15, "threshold": 0.005}, "30": {"bars": 20, "threshold": 0.008}, "60": {"bars": 30, "threshold": 0.01} } result = {} for period, cfg in period_config.items(): bars = cfg["bars"] threshold = cfg["threshold"] if len(candles) < bars: result[period] = "neutral" continue recent = candles[-bars:] first_close = float(recent[0].get("close", 0)) last_close = float(recent[-1].get("close", 0)) if first_close <= 0: result[period] = "neutral" continue change_pct = (last_close - first_close) / first_close if change_pct > threshold: result[period] = "up" elif change_pct < -threshold: result[period] = "down" else: result[period] = "neutral" return result def _calc_success_rate(candles: list) -> int: """计算交易成功率(简化版)""" if len(candles) < 10: return 50 wins = 0 for i in range(1, len(candles)): prev_close = float(candles[i-1].get("close", 0)) curr_close = float(candles[i].get("close", 0)) if curr_close >= prev_close: wins += 1 return int(wins / (len(candles) - 1) * 100) def _calc_trend_score(candles: list) -> int: """计算趋势评分(0-100)""" if len(candles) < 5: return 50 recent = candles[-10:] closes = [float(c.get("close", 0)) for c in recent] if len(closes) < 2: return 50 up_count = sum(1 for i in range(1, len(closes)) if closes[i] >= closes[i-1]) score = int(up_count / (len(closes) - 1) * 100) return max(0, min(100, score)) def _calc_ema(data: list, period: int) -> list: """计算EMA,返回与输入等长的列表,前面用None填充""" ema = [None] * len(data) multiplier = 2 / (period + 1) if len(data) < period: return ema ema[period - 1] = sum(data[:period]) / period for i in range(period, len(data)): ema[i] = (data[i] - ema[i-1]) * multiplier + ema[i-1] return ema def _calc_macd(candles: list) -> dict: """计算MACD指标""" if len(candles) < 26: return {"signal": "中性", "detail": "数据不足"} closes = [float(c.get("close", 0)) for c in candles] ema12 = _calc_ema(closes, 12) ema26 = _calc_ema(closes, 26) dif_list = [] for i in range(len(closes)): if ema12[i] is not None and ema26[i] is not None: dif_list.append(ema12[i] - ema26[i]) else: dif_list.append(None) # 只对有效DIF值计算DEA,避免None替换为0导致计算错误 dif_valid = [d for d in dif_list if d is not None] if dif_valid: dea_valid = _calc_ema(dif_valid, 9) dea_list = [None] * (len(dif_list) - len(dif_valid)) + dea_valid else: dea_list = [None] * len(dif_list) dif = dif_list[-1] dea = dea_list[-1] if dif is not None and dea is not None: if dif > dea: signal = "金叉" elif dif < dea: signal = "死叉" else: signal = "中性" else: signal = "中性" return {"signal": signal, "detail": f"DIF: {dif:.4f}"} def _calc_rsi(candles: list) -> dict: """计算RSI指标""" if len(candles) < 15: return {"value": 50, "status": "正常"} closes = [float(c.get("close", 0)) for c in candles[-15:]] gains = [] losses = [] for i in range(1, len(closes)): diff = closes[i] - closes[i-1] gains.append(max(0, diff)) losses.append(max(0, -diff)) avg_gain = sum(gains) / len(gains) if gains else 0 avg_loss = sum(losses) / len(losses) if losses else 0 if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) rsi = int(rsi) if rsi > 70: status = "超买" elif rsi < 30: status = "超卖" else: status = "正常" return {"value": rsi, "status": status} def _calc_boll(candles: list) -> dict: """计算布林带""" if len(candles) < 20: return {"signal": "中轨", "detail": "区间: --"} closes = [float(c.get("close", 0)) for c in candles[-20:]] ma = sum(closes) / len(closes) std = (sum((c - ma) ** 2 for c in closes) / len(closes)) ** 0.5 upper = ma + 2 * std lower = ma - 2 * std current = closes[-1] if current > upper: signal = "上轨外" elif current < lower: signal = "下轨外" elif current > ma: signal = "中轨上" else: signal = "中轨" return {"signal": signal, "detail": f"区间: {lower:.0f}-{upper:.0f}"} def _calc_kdj(candles: list) -> dict: """计算KDJ指标""" if len(candles) < 9: return {"signal": "中性", "detail": "K: -- D: --"} highs = [float(c.get("high", 0)) for c in candles[-9:]] lows = [float(c.get("low", 0)) for c in candles[-9:]] closes = [float(c.get("close", 0)) for c in candles[-9:]] highest = max(highs) lowest = min(lows) current = closes[-1] if highest == lowest: rsv = 50 else: rsv = (current - lowest) / (highest - lowest) * 100 k = int(rsv * 2 / 3 + 50 / 3) d = int(k * 2 / 3 + 50 / 3) if k > d: signal = "偏多" elif k < d: signal = "偏空" else: signal = "中性" return {"signal": signal, "detail": f"K: {k} D: {d}"} # ==================== 期货智析数据管理接口 ==================== @router.get("/analysis/history/{symbol}") def get_analysis_history(symbol: str, limit: int = 10, adb: Session = Depends(get_analysis_db)): """获取品种历史分析记录""" records = adb.query(FuturesAnalysis).filter( FuturesAnalysis.symbol == symbol ).order_by( FuturesAnalysis.analysis_time.desc() ).limit(limit).all() return { "success": True, "data": [{ "id": r.id, "symbol": r.symbol, "analysis_time": r.analysis_time.isoformat(), "suggestion": r.suggestion, "suggestion_type": r.suggestion_type, "trend_score": r.trend_score, "entry_price": r.entry_price, "target_price": r.target_price, "stop_loss": r.stop_loss, "risk_level": r.risk_level } for r in records] } @router.post("/analysis/save") def save_analysis_record(data: dict, adb: Session = Depends(get_analysis_db)): """保存分析记录到数据库""" try: record = FuturesAnalysis( symbol=data.get("symbol"), suggestion=data.get("suggestion"), suggestion_type=data.get("suggestion_type"), entry_price=data.get("entry_price"), target_price=data.get("target_price"), stop_loss=data.get("stop_loss"), risk_level=data.get("risk_level"), macd_signal=data.get("macd", {}).get("signal") if data.get("macd") else None, rsi_value=data.get("rsi", {}).get("value") if data.get("rsi") else None, boll_signal=data.get("boll", {}).get("signal") if data.get("boll") else None, kdj_signal=data.get("kdj", {}).get("signal") if data.get("kdj") else None, trend_score=data.get("trend_score"), success_rate=data.get("success_rate"), resistance_levels=data.get("resistances"), support_levels=data.get("supports"), period_trends=data.get("periodConsistency") ) adb.add(record) adb.commit() return {"success": True, "message": "分析记录已保存", "id": record.id} except Exception as e: adb.rollback() logger.error(f"保存分析记录失败: {e}") return {"success": False, "message": str(e)} # ==================== 关注品种管理 ==================== @router.get("/watched") def get_watched_symbols(adb: Session = Depends(get_analysis_db)): """获取关注的品种列表""" symbols = adb.query(WatchedSymbol).order_by(WatchedSymbol.created_at.desc()).all() return { "success": True, "data": [{ "id": s.id, "symbol": s.symbol, "name": s.name, "note": s.note, "created_at": s.created_at.isoformat() } for s in symbols] } @router.post("/watched") def add_watched_symbol(data: dict, adb: Session = Depends(get_analysis_db)): """添加关注品种""" try: symbol = data.get("symbol") existing = adb.query(WatchedSymbol).filter(WatchedSymbol.symbol == symbol).first() if existing: return {"success": False, "message": "该品种已关注"} new_symbol = WatchedSymbol( symbol=symbol, name=data.get("name"), note=data.get("note") ) adb.add(new_symbol) adb.commit() return {"success": True, "message": "已添加关注", "id": new_symbol.id} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} @router.delete("/watched/{symbol}") def remove_watched_symbol(symbol: str, adb: Session = Depends(get_analysis_db)): """取消关注品种""" try: record = adb.query(WatchedSymbol).filter(WatchedSymbol.symbol == symbol).first() if not record: return {"success": False, "message": "未找到该品种"} adb.delete(record) adb.commit() return {"success": True, "message": "已取消关注"} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} # ==================== AI模型配置管理 ==================== @router.get("/ai-models") def get_ai_models(adb: Session = Depends(get_analysis_db)): """获取AI模型配置列表""" models = adb.query(AIModelConfig).order_by(AIModelConfig.created_at.desc()).all() settings = adb.query(AnalysisSettings).filter( AnalysisSettings.key == "analysis_settings" ).first() return { "success": True, "data": { "models": [{ "id": m.id, "provider": m.provider, "model_name": m.model_name, "api_base": m.api_base, "model_id": m.model_id, "temperature": m.temperature, "max_tokens": m.max_tokens, "enabled": m.enabled, "is_active": m.is_active, "created_at": m.created_at.isoformat() } for m in models], "analysis_settings": settings.value if settings else { "enable_technical_analysis": True, "enable_fundamental_analysis": False, "enable_sentiment_analysis": False, "risk_tolerance": "medium", "max_position_pct": 10 } } } @router.post("/ai-models") def save_ai_model(data: dict, adb: Session = Depends(get_analysis_db)): """保存AI模型配置""" try: if data.get("action") == "save_settings": settings = adb.query(AnalysisSettings).filter( AnalysisSettings.key == "analysis_settings" ).first() if settings: settings.value = data.get("settings", {}) else: settings = AnalysisSettings( key="analysis_settings", value=data.get("settings", {}) ) adb.add(settings) adb.commit() return {"success": True, "message": "分析设置已保存"} model_data = data.get("model", {}) model = AIModelConfig( provider=model_data.get("provider", "custom"), model_name=model_data.get("model_name", ""), api_key=model_data.get("api_key", ""), api_base=model_data.get("api_base"), model_id=model_data.get("model_id"), temperature=model_data.get("temperature", 0.7), max_tokens=model_data.get("max_tokens", 2000), enabled=model_data.get("enabled", True), is_active=model_data.get("is_active", False) ) if model.is_active: adb.query(AIModelConfig).update({"is_active": False}) adb.add(model) adb.commit() return {"success": True, "message": "AI模型已保存", "id": model.id} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} @router.put("/ai-models/{model_id}") def update_ai_model(model_id: int, data: dict, adb: Session = Depends(get_analysis_db)): """更新AI模型配置""" try: model = adb.query(AIModelConfig).filter(AIModelConfig.id == model_id).first() if not model: return {"success": False, "message": "模型不存在"} if "is_active" in data and data["is_active"]: adb.query(AIModelConfig).update({"is_active": False}) model.is_active = True else: for key, value in data.items(): if hasattr(model, key): setattr(model, key, value) adb.commit() return {"success": True, "message": "模型已更新"} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} @router.delete("/ai-models/{model_id}") def delete_ai_model(model_id: int, adb: Session = Depends(get_analysis_db)): """删除AI模型配置""" try: model = adb.query(AIModelConfig).filter(AIModelConfig.id == model_id).first() if not model: return {"success": False, "message": "模型不存在"} adb.delete(model) adb.commit() return {"success": True, "message": "模型已删除"} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} # ==================== 数据刷新接口 ==================== from app.services.cache import needs_refresh, get_symbol_timestamp from app.services.ai_analysis import AIFuturesAnalyzer refresh_lock = threading.Lock() refresh_status = {"running": False, "progress": 0, "total": 0, "message": ""} REFRESH_THRESHOLD = 300 # 5分钟阈值 def _refresh_single_symbol_sync(db: Session, symbol: str) -> dict: """同步刷新单个品种数据(会等待采集完成)""" try: # 先检查是否需要刷新 if not needs_refresh(db, symbol, "futures", REFRESH_THRESHOLD): last_refresh = get_symbol_timestamp(db, symbol, "futures") return { "success": True, "message": f"{symbol} 数据仍然新鲜,无需刷新", "last_refresh": last_refresh.isoformat() if last_refresh else None, "refreshed": False } # 需要刷新,执行采集 logger.info(f"开始刷新 {symbol} 数据...") result = fetch_symbol_data(symbol, "futures") if result.get("timeframes"): save_market_data(db, symbol, result) logger.info(f"{symbol} 数据刷新完成") return { "success": True, "message": f"{symbol} 数据已更新", "refreshed": True } return {"success": False, "message": f"{symbol} 未获取到数据", "refreshed": False} except Exception as e: logger.error(f"刷新 {symbol} 失败: {e}") return {"success": False, "message": f"{symbol} 刷新失败: {str(e)}", "refreshed": False} @router.post("/refresh/{symbol}") def refresh_single_symbol_api(symbol: str, db: Session = Depends(get_db)): """刷新单个品种合约数据(同步执行,检查时间戳)""" if refresh_lock.locked(): return {"success": False, "message": "数据刷新中,请稍后再试"} try: refresh_lock.acquire() result = _refresh_single_symbol_sync(db, symbol) return result finally: refresh_lock.release() @router.post("/refresh-all") def refresh_all_symbols_api(background_tasks: BackgroundTasks): """刷新所有品种合约数据(异步执行)""" global refresh_status if refresh_lock.locked(): return {"success": False, "message": "数据刷新中,请稍后再试"} # 从配置加载所有品种 config = _load_symbols_config() futures_config = config.get("futures", {}) symbols = list(futures_config.values()) def refresh_all_task(): global refresh_status # 在后台任务内部创建新的数据库会话 local_db = next(get_db()) try: with refresh_lock: refresh_status = {"running": True, "progress": 0, "total": len(symbols), "message": "开始刷新..."} for i, symbol in enumerate(symbols): refresh_status["message"] = f"正在刷新 {symbol} ({i + 1}/{len(symbols)})" refresh_status["progress"] = i + 1 _refresh_single_symbol_sync(local_db, symbol) with refresh_lock: refresh_status = {"running": False, "progress": len(symbols), "total": len(symbols), "message": "全部刷新完成"} finally: local_db.close() background_tasks.add_task(refresh_all_task) return {"success": True, "message": "开始刷新所有品种数据...", "count": len(symbols)} @router.get("/refresh-status") def get_refresh_status(): """获取刷新状态""" return {"success": True, "data": refresh_status} # ==================== AI智能分析接口 ==================== @router.post("/ai-analysis/{symbol}") def run_ai_analysis(symbol: str, db: Session = Depends(get_db), analysis_db: Session = Depends(get_analysis_db)): """执行AI智能分析""" try: analyzer = AIFuturesAnalyzer(db, analysis_db) result = analyzer.analyze(symbol) if result.get("success"): return { "success": True, "data": result["data"] } else: return { "success": False, "error": result.get("error", "AI分析失败") } except Exception as e: logger.error(f"AI分析失败: {e}") return { "success": False, "error": f"AI分析失败: {str(e)}" } @router.get("/ai-analysis/{symbol}") def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depends(get_db), analysis_db: Session = Depends(get_analysis_db)): """获取AI分析结果(智能判断是否需要重新分析)""" try: analyzer = AIFuturesAnalyzer(db, analysis_db) if force_refresh: logger.info(f"强制刷新: {symbol}") result = analyzer.analyze(symbol) if result.get("success"): return { "success": True, "data": result["data"], "is_cached": False } else: return { "success": False, "error": result.get("error", "AI分析失败") } # 获取最新缓存 cache = analyzer.get_latest_cache(symbol) if cache: # 智能判断是否需要重新分析 if analyzer.should_reanalyze(symbol, cache): logger.info(f"检测到数据变化或超时,自动重新分析: {symbol}") result = analyzer.analyze(symbol) if result.get("success"): return { "success": True, "data": result["data"], "is_cached": False } else: # 如果重新分析失败,返回旧缓存 logger.warning(f"重新分析失败,返回旧缓存: {symbol}") return { "success": True, "data": { "id": cache.id, "symbol": cache.symbol, "analysis_time": cache.created_at.isoformat(), "result": cache.analysis_data }, "is_cached": True, "warning": "分析数据可能不是最新的" } # 返回缓存数据 return { "success": True, "data": { "id": cache.id, "symbol": cache.symbol, "analysis_time": cache.created_at.isoformat(), "result": cache.analysis_data }, "is_cached": True } # 没有缓存,执行分析 result = analyzer.analyze(symbol) if result.get("success"): return { "success": True, "data": result["data"], "is_cached": False } else: return { "success": False, "error": result.get("error", "未找到分析结果") } except Exception as e: logger.error(f"获取AI分析结果失败: {e}") return { "success": False, "error": f"获取AI分析失败: {str(e)}" } @router.get("/ai-analysis/{symbol}/history") def get_ai_analysis_history(symbol: str, limit: int = 20, analysis_db: Session = Depends(get_analysis_db)): """获取AI分析历史记录""" try: records = analysis_db.query(AIAnalysisCache).filter( AIAnalysisCache.symbol == symbol ).order_by( AIAnalysisCache.created_at.desc() ).limit(limit).all() return { "success": True, "data": [{ "id": r.id, "symbol": r.symbol, "analysis_time": r.created_at.isoformat(), "analysis_data": r.analysis_data } for r in records] } except Exception as e: logger.error(f"获取AI分析历史失败: {e}") return { "success": False, "error": f"获取历史记录失败: {str(e)}" } @router.get("/ai-analysis/history/{record_id}") def get_ai_analysis_detail(record_id: int, analysis_db: Session = Depends(get_analysis_db)): """获取单条AI分析记录详情""" try: record = analysis_db.query(AIAnalysisCache).filter( AIAnalysisCache.id == record_id ).first() if not record: return { "success": False, "error": "记录不存在" } return { "success": True, "data": { "id": record.id, "symbol": record.symbol, "analysis_time": record.created_at.isoformat(), "analysis_data": record.analysis_data } } except Exception as e: logger.error(f"获取AI分析详情失败: {e}") return { "success": False, "error": f"获取记录详情失败: {str(e)}" }