You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

460 lines
17 KiB

"""
AI分析服务 - 期货四维联合分析
"""
import json
import re
import logging
from datetime import datetime
from typing import Dict, List, Optional
from sqlalchemy.orm import Session
from app.analysis_models import AIAnalysisCache
from app.services.cache import get_cached_data, get_latest_cached
from pathlib import Path
CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config"
AI_CONFIG_FILE = CONFIG_DIR / "ai_config.json"
logger = logging.getLogger(__name__)
class AIAnalysisPrompt:
"""AI分析提示词管理器"""
SYSTEM_PROMPT = """你是一位拥有20年实战经验的资深金融交易分析师精通A股市场与商品期货的技术分析。
你的核心使命是基于提供的K线数据执行四维联合判断分析法(4D-XV)并提供包含风控红线审查的客观交易策略
你的分析必须遵循以下原则
1. 数据驱动所有结论必须基于输入的JSON数据严禁凭空捏造
2. 四维共振任何交易建议必须经过MACD趋势成交量资金KDJ时机多周期方向的交叉验证
3. 红线否决如果数据触发17条交易红线必须直接给出禁止交易止损的建议
4. 客观中立不使用绝对化表述提供情景预案概率估算
5. 动态切换遇到关键位放量突破时立即切换右侧思维不逆势死扛"""
ANALYSIS_TEMPLATE = """请严格按照以下JSON格式输出分析结果不要输出任何其他内容
{
"summary": "一句话总结当前市场状态",
"four_dimensional": {
"60min": {
"macd": {"trend": "up/down/neutral", "histogram": "放大/缩小/背离", "position": "零轴上/下"},
"volume": {"status": "放量上涨/缩量回调/趋势量能/拐点量能", "ratio": 1.5},
"kdj": {"k": 85, "d": 80, "j": 95, "status": "超买/超卖/中性", "signal": "金叉/死叉/钝化"},
"conclusion": "定大势结论"
},
"30min": {
"macd": {"trend": "up/down/neutral", "histogram": "放大/缩小/背离", "position": "零轴上/下"},
"volume": {"status": "放量/缩量/正常", "ratio": 1.5},
"kdj": {"k": 50, "d": 45, "j": 60, "status": "中性", "signal": "金叉/死叉"},
"conclusion": "找拐点结论"
},
"15min": {
"macd": {"trend": "up/down/neutral", "histogram": "放大/缩小/背离", "position": "零轴上/下"},
"volume": {"status": "放量/缩量/正常", "ratio": 2.0},
"kdj": {"k": 30, "d": 25, "j": 40, "status": "超卖", "signal": "金叉/死叉"},
"conclusion": "择入场结论"
}
},
"kdj_diagnosis": {
"current_status": "超买/超卖/中性区域",
"divergence": "是否存在顶/底背离",
"paralysis": "是否钝化(持续>6根K线",
"recommendation": "KDJ使用建议"
},
"pivot_points": {
"r2": 7500,
"r1": 7350,
"pp": 7200,
"s1": 7050,
"s2": 6900,
"validation": {
"test_count": 3,
"volume_confirmed": true,
"multi_period_resonance": true,
"breakback_confirmed": false
}
},
"red_lines_check": {
"passed": true,
"violated": [],
"warnings": ["暂无红线警告"]
},
"discipline_score": {
"total": 9,
"max": 11,
"details": {
"trend": true,
"position": true,
"signal": true,
"risk": true,
"mindset": true
}
},
"trading_suggestion": {
"direction": "做多/做空/观望",
"confidence": 75,
"entry_range": {"min": 7050, "max": 7100},
"stop_loss": 6950,
"take_profit": [{"price": 7200, "ratio": 50}, {"price": 7350, "ratio": 30}, {"price": 7500, "ratio": 20}],
"position_size": "轻仓/半仓/重仓",
"reason": "做多理由"
},
"scenario_plans": {
"breakthrough": {"probability": 35, "action": "放量突破关键位,跟随右侧思维"},
"consolidation": {"probability": 40, "action": "R1-S1区间内高抛低吸"},
"reversal": {"probability": 15, "action": "MACD顶/底背离+量能不足,立即止损反手"},
"news_impact": {"probability": 10, "action": "减仓50%规避不确定性"}
},
"risk_warnings": [
"技术指标具有滞后性,历史表现不代表未来",
"需结合基本面和市场情绪综合判断"
],
"experience_lessons": [
"警惕缩量创新高,可能是诱多信号",
"KDJ超买钝化中不宜逆势做空"
]
}"""
@classmethod
def build_prompt(cls, symbol: str, data: Dict) -> str:
"""构建完整的AI分析提示词"""
prompt = f"""{cls.SYSTEM_PROMPT}
现在请分析以下期货品种的K线数据
## 品种信息
- 合约代码{symbol}
- 当前价格{data.get('current_price', 'N/A')}
## 多周期K线数据
```json
{json.dumps(data, ensure_ascii=False, indent=2)}
```
{cls.ANALYSIS_TEMPLATE}"""
return prompt
class AIFuturesAnalyzer:
"""AI期货分析器"""
def __init__(self, db: Session, analysis_db: Session = None):
self.db = db
self.analysis_db = analysis_db or db
def get_active_model(self) -> Optional[Dict]:
"""获取当前激活的AI模型配置"""
try:
if not AI_CONFIG_FILE.exists():
return None
with open(AI_CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
models = config.get("models", [])
active_model_name = config.get("active_model")
if active_model_name:
for model in models:
if model.get("model_name") == active_model_name and model.get("enabled", True):
return model
for model in models:
if model.get("enabled", True):
logger.warning(f"未找到匹配的激活模型,使用第一个启用的模型: {model.get('model_name')}")
return model
return None
except Exception as e:
logger.error(f"加载AI配置失败: {e}")
return None
def prepare_multi_period_data(self, symbol: str) -> Optional[Dict]:
"""准备多周期数据用于AI分析"""
cached_data = get_cached_data(
self.db,
symbol,
"futures",
["5min", "15min", "30min", "60min", "daily"]
)
if not cached_data or not cached_data.get("timeframes"):
return None
timeframes = cached_data.get("timeframes", {})
current_price = cached_data.get("current_price")
result = {
"symbol": symbol,
"current_price": current_price,
"timeframes": {}
}
for period_name, db_period in [("5min", "5min"), ("15min", "15min"),
("30min", "30min"), ("60min", "60min"),
("daily", "daily")]:
if period_name in timeframes and timeframes[period_name]:
candles = timeframes[period_name]
if len(candles) >= 20:
result["timeframes"][period_name] = self._analyze_timeframe(candles, period_name)
return result
def _analyze_timeframe(self, candles: List[Dict], period: str) -> Dict:
"""分析单个周期的技术指标"""
if not candles or len(candles) < 20:
return {}
closes = [float(c.get("close", 0)) for c in candles]
highs = [float(c.get("high", 0)) for c in candles]
lows = [float(c.get("low", 0)) for c in candles]
volumes = [float(c.get("volume", 0)) for c in candles]
ma10 = sum(closes[-10:]) / 10 if len(closes) >= 10 else None
ma20 = sum(closes[-20:]) / 20 if len(closes) >= 20 else None
macd_data = self._calc_macd(closes)
kdj_data = self._calc_kdj(highs, lows, closes)
avg_volume = sum(volumes[-20:]) / 20 if len(volumes) >= 20 else 0
current_volume = volumes[-1] if volumes else 0
return {
"trend": "up" if closes[-1] > closes[0] else "down",
"ma10": round(ma10, 2) if ma10 else None,
"ma20": round(ma20, 2) if ma20 else None,
"macd_dif": round(macd_data["dif"], 4),
"macd_dea": round(macd_data["dea"], 4),
"macd_histogram": round(macd_data["histogram"], 4),
"kdj_k": kdj_data["k"],
"kdj_d": kdj_data["d"],
"kdj_j": kdj_data["j"],
"volume_avg": round(avg_volume, 2),
"volume_current": round(current_volume, 2),
"volume_ratio": round(current_volume / avg_volume, 2) if avg_volume > 0 else 1,
"candles": candles[-10:] if len(candles) > 10 else candles
}
def _calc_macd(self, closes: List[float]) -> Dict:
"""计算MACD指标"""
if len(closes) < 26:
return {"dif": 0, "dea": 0, "histogram": 0}
ema12 = self._calc_ema(closes, 12)
ema26 = self._calc_ema(closes, 26)
dif = ema12 - ema26
dea = self._calc_ema([dif] * len(closes), 9)
histogram = 2 * (dif - dea)
return {"dif": dif, "dea": dea, "histogram": histogram}
def _calc_ema(self, data: List[float], period: int) -> float:
"""计算EMA"""
if len(data) < period:
return sum(data) / len(data) if data else 0
multiplier = 2 / (period + 1)
ema = sum(data[:period]) / period
for i in range(period, len(data)):
ema = (data[i] - ema) * multiplier + ema
return ema
def _calc_kdj(self, highs: List[float], lows: List[float], closes: List[float]) -> Dict:
"""计算KDJ指标"""
if len(closes) < 9:
return {"k": 50, "d": 50, "j": 50}
period = 9
recent_highs = highs[-period:]
recent_lows = lows[-period:]
recent_closes = closes[-period:]
highest = max(recent_highs)
lowest = min(recent_lows)
current = recent_closes[-1]
if highest == lowest:
rsv = 50
else:
rsv = (current - lowest) / (highest - lowest) * 100
k = rsv * 2 / 3 + 50 / 3
d = k * 2 / 3 + 50 / 3
j = 3 * k - 2 * d
return {"k": round(k, 2), "d": round(d, 2), "j": round(j, 2)}
def call_ai_model(self, prompt: str, model: Dict) -> Optional[str]:
"""调用AI模型"""
try:
import requests
api_base = model.get("api_base", "https://api.openai.com/v1")
api_key = model.get("api_key", "")
model_id = model.get("model_id") or model.get("model_name", "")
logger.info(f"========== AI模型调用开始 ==========")
logger.info(f"API Base: {api_base}")
logger.info(f"API Key: {'已配置' if api_key else '未配置'} ({api_key[:10]}...)" if api_key else "API Key: 未配置")
logger.info(f"Model ID: {model_id}")
logger.info(f"Temperature: {model.get('temperature', 0.7)}")
logger.info(f"Max Tokens: {model.get('max_tokens', 2000)}")
if not api_key:
logger.error("API Key 未配置无法调用AI模型")
return None
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"model": model_id,
"messages": [
{"role": "user", "content": prompt}
],
"temperature": model.get("temperature", 0.7),
"max_tokens": model.get("max_tokens", 2000)
}
url = f"{api_base}/chat/completions"
logger.info(f"请求 URL: {url}")
logger.info(f"请求 Payload 大小: {len(str(payload))} 字符")
response = requests.post(
url,
headers=headers,
json=payload,
timeout=180 # 增加到180秒3分钟超时
)
logger.info(f"响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
content = result["choices"][0]["message"]["content"]
logger.info(f"AI响应成功内容长度: {len(content)} 字符")
logger.info(f"========== AI模型调用成功 ==========")
return content
else:
logger.error(f"AI模型调用失败:")
logger.error(f" 状态码: {response.status_code}")
logger.error(f" 响应内容: {response.text}")
logger.error(f"========== AI模型调用失败 ==========")
return None
except requests.exceptions.Timeout:
logger.error("AI模型调用超时超过60秒")
logger.error(f"========== AI模型调用失败 ==========")
return None
except requests.exceptions.ConnectionError as e:
logger.error(f"AI模型连接错误: {e}")
logger.error(f"========== AI模型调用失败 ==========")
return None
except requests.exceptions.RequestException as e:
logger.error(f"AI模型请求异常: {e}")
logger.error(f"========== AI模型调用失败 ==========")
return None
except Exception as e:
logger.error(f"调用AI模型未知异常: {e}", exc_info=True)
logger.error(f"========== AI模型调用失败 ==========")
return None
def parse_ai_response(self, response: str) -> Optional[Dict]:
"""解析AI返回的JSON响应"""
try:
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
return json.loads(json_match.group(0))
return None
except Exception as e:
logger.error(f"解析AI响应失败: {e}")
return None
def save_analysis_cache(self, symbol: str, analysis_data: Dict) -> AIAnalysisCache:
"""保存AI分析结果到缓存"""
cache = AIAnalysisCache(
symbol=symbol,
analysis_data=analysis_data,
created_at=datetime.now()
)
self.analysis_db.add(cache)
self.analysis_db.commit()
self.analysis_db.refresh(cache)
return cache
def get_latest_cache(self, symbol: str) -> Optional[AIAnalysisCache]:
"""获取最新的AI分析缓存"""
return self.analysis_db.query(AIAnalysisCache).filter(
AIAnalysisCache.symbol == symbol
).order_by(AIAnalysisCache.created_at.desc()).first()
def analyze(self, symbol: str) -> Dict:
"""执行完整的AI分析流程"""
logger.info(f"===== 开始AI分析: {symbol} =====")
model = self.get_active_model()
if not model:
logger.error("未找到激活的AI模型配置")
return {
"success": False,
"error": "未配置AI模型或模型未激活"
}
logger.info(f"使用AI模型: {model.get('model_name')}")
data = self.prepare_multi_period_data(symbol)
if not data:
logger.error(f"未找到 {symbol} 的市场数据")
return {
"success": False,
"error": f"未找到 {symbol} 的市场数据"
}
logger.info(f"市场数据准备成功,包含周期: {list(data.get('timeframes', {}).keys())}")
prompt = AIAnalysisPrompt.build_prompt(symbol, data)
logger.info(f"AI提示词生成完成长度: {len(prompt)} 字符")
response = self.call_ai_model(prompt, model)
if not response:
logger.error("AI模型返回空响应")
return {
"success": False,
"error": "AI模型调用失败"
}
logger.info(f"AI模型响应接收成功长度: {len(response)} 字符")
analysis_result = self.parse_ai_response(response)
if not analysis_result:
logger.error(f"AI响应解析失败原始响应前100字符: {response[:100]}")
return {
"success": False,
"error": "AI响应解析失败"
}
logger.info(f"AI响应解析成功")
cache = self.save_analysis_cache(symbol, analysis_result)
logger.info(f"分析结果已保存到缓存ID: {cache.id}")
logger.info(f"===== AI分析完成: {symbol} =====")
return {
"success": True,
"data": {
"id": cache.id,
"symbol": symbol,
"analysis_time": cache.created_at.isoformat(),
"result": analysis_result
}
}