""" AI分析服务 - 期货四维联合分析 """ import json import re import logging from datetime import datetime from typing import Dict, List, Optional from sqlalchemy.orm import Session from app.analysis_models import AIAnalysisCache from app.services.cache import get_cached_data, get_latest_cached from pathlib import Path CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config" AI_CONFIG_FILE = CONFIG_DIR / "ai_config.json" logger = logging.getLogger(__name__) class AIAnalysisPrompt: """AI分析提示词管理器""" SYSTEM_PROMPT = """你是一位拥有20年实战经验的资深金融交易分析师,精通A股市场与商品期货的技术分析。 你的核心使命是基于提供的K线数据,执行【四维联合判断分析法(4D-XV)】,并提供包含风控红线审查的客观交易策略。 你的分析必须遵循以下原则: 1. 数据驱动:所有结论必须基于输入的JSON数据,严禁凭空捏造。 2. 四维共振:任何交易建议必须经过MACD(趋势)、成交量(资金)、KDJ(时机)、多周期(方向)的交叉验证。 3. 红线否决:如果数据触发【17条交易红线】,必须直接给出【禁止交易】或【止损】的建议。 4. 客观中立:不使用绝对化表述,提供情景预案(概率估算)。 5. 动态切换:遇到关键位放量突破时,立即切换右侧思维,不逆势死扛。""" ANALYSIS_TEMPLATE = """请严格按照以下JSON格式输出分析结果(不要输出任何其他内容): { "summary": "一句话总结当前市场状态", "four_dimensional": { "60min": { "macd": {"trend": "up/down/neutral", "histogram": "放大/缩小/背离", "position": "零轴上/下"}, "volume": {"status": "放量上涨/缩量回调/趋势量能/拐点量能", "ratio": 1.5}, "kdj": {"k": 85, "d": 80, "j": 95, "status": "超买/超卖/中性", "signal": "金叉/死叉/钝化"}, "conclusion": "定大势结论" }, "30min": { "macd": {"trend": "up/down/neutral", "histogram": "放大/缩小/背离", "position": "零轴上/下"}, "volume": {"status": "放量/缩量/正常", "ratio": 1.5}, "kdj": {"k": 50, "d": 45, "j": 60, "status": "中性", "signal": "金叉/死叉"}, "conclusion": "找拐点结论" }, "15min": { "macd": {"trend": "up/down/neutral", "histogram": "放大/缩小/背离", "position": "零轴上/下"}, "volume": {"status": "放量/缩量/正常", "ratio": 2.0}, "kdj": {"k": 30, "d": 25, "j": 40, "status": "超卖", "signal": "金叉/死叉"}, "conclusion": "择入场结论" } }, "kdj_diagnosis": { "current_status": "超买/超卖/中性区域", "divergence": "是否存在顶/底背离", "paralysis": "是否钝化(持续>6根K线)", "recommendation": "KDJ使用建议" }, "pivot_points": { "r2": 7500, "r1": 7350, "pp": 7200, "s1": 7050, "s2": 6900, "validation": { "test_count": 3, "volume_confirmed": true, "multi_period_resonance": true, "breakback_confirmed": false } }, "red_lines_check": { "passed": true, "violated": [], "warnings": ["暂无红线警告"] }, "discipline_score": { "total": 9, "max": 11, "details": { "trend": true, "position": true, "signal": true, "risk": true, "mindset": true } }, "trading_suggestion": { "direction": "做多/做空/观望", "confidence": 75, "entry_range": {"min": 7050, "max": 7100}, "stop_loss": 6950, "take_profit": [{"price": 7200, "ratio": 50}, {"price": 7350, "ratio": 30}, {"price": 7500, "ratio": 20}], "position_size": "轻仓/半仓/重仓", "reason": "做多理由" }, "scenario_plans": { "breakthrough": {"probability": 35, "action": "放量突破关键位,跟随右侧思维"}, "consolidation": {"probability": 40, "action": "R1-S1区间内高抛低吸"}, "reversal": {"probability": 15, "action": "MACD顶/底背离+量能不足,立即止损反手"}, "news_impact": {"probability": 10, "action": "减仓50%规避不确定性"} }, "risk_warnings": [ "技术指标具有滞后性,历史表现不代表未来", "需结合基本面和市场情绪综合判断" ], "experience_lessons": [ "警惕缩量创新高,可能是诱多信号", "KDJ超买钝化中不宜逆势做空" ] }""" @classmethod def build_prompt(cls, symbol: str, data: Dict) -> str: """构建完整的AI分析提示词""" prompt = f"""{cls.SYSTEM_PROMPT} 现在请分析以下期货品种的K线数据: ## 品种信息 - 合约代码:{symbol} - 当前价格:{data.get('current_price', 'N/A')} ## 多周期K线数据 ```json {json.dumps(data, ensure_ascii=False, indent=2)} ``` {cls.ANALYSIS_TEMPLATE}""" return prompt class AIFuturesAnalyzer: """AI期货分析器""" def __init__(self, db: Session, analysis_db: Session = None): self.db = db self.analysis_db = analysis_db or db def get_active_model(self) -> Optional[Dict]: """获取当前激活的AI模型配置""" try: if not AI_CONFIG_FILE.exists(): return None with open(AI_CONFIG_FILE, "r", encoding="utf-8") as f: config = json.load(f) models = config.get("models", []) active_model_name = config.get("active_model") if active_model_name: for model in models: if model.get("model_name") == active_model_name and model.get("enabled", True): return model for model in models: if model.get("enabled", True): logger.warning(f"未找到匹配的激活模型,使用第一个启用的模型: {model.get('model_name')}") return model return None except Exception as e: logger.error(f"加载AI配置失败: {e}") return None def prepare_multi_period_data(self, symbol: str) -> Optional[Dict]: """准备多周期数据用于AI分析""" cached_data = get_cached_data( self.db, symbol, "futures", ["5min", "15min", "30min", "60min", "daily"] ) if not cached_data or not cached_data.get("timeframes"): return None timeframes = cached_data.get("timeframes", {}) current_price = cached_data.get("current_price") result = { "symbol": symbol, "current_price": current_price, "timeframes": {} } for period_name, db_period in [("5min", "5min"), ("15min", "15min"), ("30min", "30min"), ("60min", "60min"), ("daily", "daily")]: if period_name in timeframes and timeframes[period_name]: candles = timeframes[period_name] if len(candles) >= 20: result["timeframes"][period_name] = self._analyze_timeframe(candles, period_name) return result def _analyze_timeframe(self, candles: List[Dict], period: str) -> Dict: """分析单个周期的技术指标""" if not candles or len(candles) < 20: return {} closes = [float(c.get("close", 0)) for c in candles] highs = [float(c.get("high", 0)) for c in candles] lows = [float(c.get("low", 0)) for c in candles] volumes = [float(c.get("volume", 0)) for c in candles] ma10 = sum(closes[-10:]) / 10 if len(closes) >= 10 else None ma20 = sum(closes[-20:]) / 20 if len(closes) >= 20 else None macd_data = self._calc_macd(closes) kdj_data = self._calc_kdj(highs, lows, closes) avg_volume = sum(volumes[-20:]) / 20 if len(volumes) >= 20 else 0 current_volume = volumes[-1] if volumes else 0 return { "trend": "up" if closes[-1] > closes[0] else "down", "ma10": round(ma10, 2) if ma10 else None, "ma20": round(ma20, 2) if ma20 else None, "macd_dif": round(macd_data["dif"], 4), "macd_dea": round(macd_data["dea"], 4), "macd_histogram": round(macd_data["histogram"], 4), "kdj_k": kdj_data["k"], "kdj_d": kdj_data["d"], "kdj_j": kdj_data["j"], "volume_avg": round(avg_volume, 2), "volume_current": round(current_volume, 2), "volume_ratio": round(current_volume / avg_volume, 2) if avg_volume > 0 else 1, "candles": candles[-10:] if len(candles) > 10 else candles } def _calc_macd(self, closes: List[float]) -> Dict: """计算MACD指标""" if len(closes) < 26: return {"dif": 0, "dea": 0, "histogram": 0} ema12 = self._calc_ema(closes, 12) ema26 = self._calc_ema(closes, 26) dif = ema12 - ema26 dea = self._calc_ema([dif] * len(closes), 9) histogram = 2 * (dif - dea) return {"dif": dif, "dea": dea, "histogram": histogram} def _calc_ema(self, data: List[float], period: int) -> float: """计算EMA""" if len(data) < period: return sum(data) / len(data) if data else 0 multiplier = 2 / (period + 1) ema = sum(data[:period]) / period for i in range(period, len(data)): ema = (data[i] - ema) * multiplier + ema return ema def _calc_kdj(self, highs: List[float], lows: List[float], closes: List[float]) -> Dict: """计算KDJ指标""" if len(closes) < 9: return {"k": 50, "d": 50, "j": 50} period = 9 recent_highs = highs[-period:] recent_lows = lows[-period:] recent_closes = closes[-period:] highest = max(recent_highs) lowest = min(recent_lows) current = recent_closes[-1] if highest == lowest: rsv = 50 else: rsv = (current - lowest) / (highest - lowest) * 100 k = rsv * 2 / 3 + 50 / 3 d = k * 2 / 3 + 50 / 3 j = 3 * k - 2 * d return {"k": round(k, 2), "d": round(d, 2), "j": round(j, 2)} def call_ai_model(self, prompt: str, model: Dict) -> Optional[str]: """调用AI模型""" try: import requests api_base = model.get("api_base", "https://api.openai.com/v1") api_key = model.get("api_key", "") model_id = model.get("model_id") or model.get("model_name", "") logger.info(f"========== AI模型调用开始 ==========") logger.info(f"API Base: {api_base}") logger.info(f"API Key: {'已配置' if api_key else '未配置'} ({api_key[:10]}...)" if api_key else "API Key: 未配置") logger.info(f"Model ID: {model_id}") logger.info(f"Temperature: {model.get('temperature', 0.7)}") logger.info(f"Max Tokens: {model.get('max_tokens', 2000)}") if not api_key: logger.error("API Key 未配置,无法调用AI模型") return None headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } payload = { "model": model_id, "messages": [ {"role": "user", "content": prompt} ], "temperature": model.get("temperature", 0.7), "max_tokens": model.get("max_tokens", 2000) } url = f"{api_base}/chat/completions" logger.info(f"请求 URL: {url}") logger.info(f"请求 Payload 大小: {len(str(payload))} 字符") response = requests.post( url, headers=headers, json=payload, timeout=180 # 增加到180秒(3分钟)超时 ) logger.info(f"响应状态码: {response.status_code}") if response.status_code == 200: result = response.json() content = result["choices"][0]["message"]["content"] logger.info(f"AI响应成功,内容长度: {len(content)} 字符") logger.info(f"========== AI模型调用成功 ==========") return content else: logger.error(f"AI模型调用失败:") logger.error(f" 状态码: {response.status_code}") logger.error(f" 响应内容: {response.text}") logger.error(f"========== AI模型调用失败 ==========") return None except requests.exceptions.Timeout: logger.error("AI模型调用超时(超过60秒)") logger.error(f"========== AI模型调用失败 ==========") return None except requests.exceptions.ConnectionError as e: logger.error(f"AI模型连接错误: {e}") logger.error(f"========== AI模型调用失败 ==========") return None except requests.exceptions.RequestException as e: logger.error(f"AI模型请求异常: {e}") logger.error(f"========== AI模型调用失败 ==========") return None except Exception as e: logger.error(f"调用AI模型未知异常: {e}", exc_info=True) logger.error(f"========== AI模型调用失败 ==========") return None def parse_ai_response(self, response: str) -> Optional[Dict]: """解析AI返回的JSON响应""" try: json_match = re.search(r'\{[\s\S]*\}', response) if json_match: return json.loads(json_match.group(0)) return None except Exception as e: logger.error(f"解析AI响应失败: {e}") return None def save_analysis_cache(self, symbol: str, analysis_data: Dict) -> AIAnalysisCache: """保存AI分析结果到缓存""" cache = AIAnalysisCache( symbol=symbol, analysis_data=analysis_data, created_at=datetime.now() ) self.analysis_db.add(cache) self.analysis_db.commit() self.analysis_db.refresh(cache) return cache def get_latest_cache(self, symbol: str) -> Optional[AIAnalysisCache]: """获取最新的AI分析缓存""" return self.analysis_db.query(AIAnalysisCache).filter( AIAnalysisCache.symbol == symbol ).order_by(AIAnalysisCache.created_at.desc()).first() def analyze(self, symbol: str) -> Dict: """执行完整的AI分析流程""" logger.info(f"===== 开始AI分析: {symbol} =====") model = self.get_active_model() if not model: logger.error("未找到激活的AI模型配置") return { "success": False, "error": "未配置AI模型或模型未激活" } logger.info(f"使用AI模型: {model.get('model_name')}") data = self.prepare_multi_period_data(symbol) if not data: logger.error(f"未找到 {symbol} 的市场数据") return { "success": False, "error": f"未找到 {symbol} 的市场数据" } logger.info(f"市场数据准备成功,包含周期: {list(data.get('timeframes', {}).keys())}") prompt = AIAnalysisPrompt.build_prompt(symbol, data) logger.info(f"AI提示词生成完成,长度: {len(prompt)} 字符") response = self.call_ai_model(prompt, model) if not response: logger.error("AI模型返回空响应") return { "success": False, "error": "AI模型调用失败" } logger.info(f"AI模型响应接收成功,长度: {len(response)} 字符") analysis_result = self.parse_ai_response(response) if not analysis_result: logger.error(f"AI响应解析失败,原始响应前100字符: {response[:100]}") return { "success": False, "error": "AI响应解析失败" } logger.info(f"AI响应解析成功") cache = self.save_analysis_cache(symbol, analysis_result) logger.info(f"分析结果已保存到缓存,ID: {cache.id}") logger.info(f"===== AI分析完成: {symbol} =====") return { "success": True, "data": { "id": cache.id, "symbol": symbol, "analysis_time": cache.created_at.isoformat(), "result": analysis_result } }