You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
11 KiB

# AI 研判模块
import json
import requests
from typing import Dict, Optional, List
from qihuo_analyzer.utils.config_manager import config_manager
from qihuo_analyzer.core.models import AnalysisResult
class DeepseekAgent:
"""DeepSeek AI 研判代理"""
def __init__(self):
self.api_key = config_manager.deepseek_api_key
self.api_url = config_manager.deepseek_api_url
self.headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}'
}
def analyze_market(self, market_data: Dict, technical_indicators: Dict,
trend_analysis: Dict, risk_metrics: Dict) -> Dict:
"""分析市场"""
# 构建提示词
prompt = self._build_analysis_prompt(market_data, technical_indicators, trend_analysis, risk_metrics)
# 调用API
response = self._call_deepseek_api(prompt)
# 解析结果
analysis_result = self._parse_analysis_result(response)
return analysis_result
def generate_trade_recommendation(self, analysis_result: Dict) -> Dict:
"""生成交易建议"""
# 构建提示词
prompt = self._build_recommendation_prompt(analysis_result)
# 调用API
response = self._call_deepseek_api(prompt)
# 解析结果
recommendation = self._parse_recommendation_result(response)
return recommendation
def _build_analysis_prompt(self, market_data: Dict, technical_indicators: Dict,
trend_analysis: Dict, risk_metrics: Dict) -> str:
"""构建分析提示词"""
prompt = f"""# 期货市场分析任务
你是一位专业的期货市场分析师需要基于以下多维度数据对市场进行综合研判
## 1. 市场基本数据
- 品种{market_data.get('symbol', '未知')}
- 最新价格{market_data.get('latest_price', '未知')}
- 成交量{market_data.get('volume', '未知')}
- 持仓量{market_data.get('open_interest', '未知')}
- 时间周期{market_data.get('timeframe', '未知')}
## 2. 技术指标数据
- MACD{json.dumps(technical_indicators.get('macd', {}), ensure_ascii=False)}
- RSI{technical_indicators.get('rsi', '未知')}
- 布林带{json.dumps(technical_indicators.get('bollinger', {}), ensure_ascii=False)}
- KDJ{json.dumps(technical_indicators.get('kdj', {}), ensure_ascii=False)}
- ATR{technical_indicators.get('atr', '未知')}
## 3. 趋势分析数据
- ADX{trend_analysis.get('adx', '未知')}
- 趋势强度{trend_analysis.get('trend_strength', '未知')}
- 趋势方向{trend_analysis.get('trend_direction', '未知')}
- 双均线关系{trend_analysis.get('ma_relationship', '未知')}
- 多周期共振{json.dumps(trend_analysis.get('multi_period_analysis', {}), ensure_ascii=False)}
- 综合趋势{trend_analysis.get('overall_trend', '未知')}
- 胜率{trend_analysis.get('win_rate', '未知')}%
## 4. 风险指标数据
- 止损位{risk_metrics.get('stop_loss', '未知')}
- 目标价{risk_metrics.get('target_price', '未知')}
- 盈亏比{risk_metrics.get('profit_loss_ratio', '未知')}
- 建议仓位{risk_metrics.get('position_size', '未知')}
- 风险比例{risk_metrics.get('risk_ratio', '未知')}%
## 分析要求
1. **趋势判断**基于多维度数据判断当前市场的主要趋势
2. **胜率评估**评估当前交易机会的胜率
3. **风险预警**识别潜在的风险因素
4. **交易建议**给出具体的交易方向仓位止损止盈建议
5. **逻辑解释**详细说明分析逻辑和依据
请以JSON格式输出分析结果包含以下字段
- trend_judgment趋势判断
- win_rate_assessment胜率评估
- risk_warning风险预警
- trade_recommendation交易建议
- analysis_logic分析逻辑
"""
return prompt
def _build_recommendation_prompt(self, analysis_result: Dict) -> str:
"""构建建议提示词"""
prompt = f"""# 期货交易建议生成任务
基于以下市场分析结果生成详细的交易建议
## 分析结果
{json.dumps(analysis_result, ensure_ascii=False, indent=2)}
## 建议要求
1. **明确的交易方向**做多/做空/观望
2. **具体的入场点位**基于技术分析的合理入场点
3. **严格的止损设置**基于ATR的动态止损
4. **合理的止盈目标**基于压力支撑位的目标价
5. **科学的仓位管理**基于账户资金的风险控制
6. **详细的执行计划**包括入场时机加仓策略出场条件
7. **风险提示**潜在的风险因素和应对措施
请以JSON格式输出交易建议包含以下字段
- direction交易方向
- entry_price入场价格
- stop_loss止损价格
- target_price目标价格
- position_size仓位大小
- execution_plan执行计划
- risk_tips风险提示
"""
return prompt
def _call_deepseek_api(self, prompt: str) -> str:
"""调用DeepSeek API"""
# 如果没有API密钥返回模拟结果
if not self.api_key:
return self._get_mock_response(prompt)
payload = {
'model': 'deepseek-chat',
'messages': [
{
'role': 'system',
'content': '你是一位专业的期货市场分析师,精通技术分析和基本面分析,能够基于多维度数据提供准确的市场研判和交易建议。'
},
{
'role': 'user',
'content': prompt
}
],
'temperature': 0.3,
'max_tokens': 2000,
'top_p': 0.9
}
try:
response = requests.post(
self.api_url,
headers=self.headers,
json=payload,
timeout=30
)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
except Exception as e:
print(f"API调用失败{e}")
return self._get_mock_response(prompt)
def _get_mock_response(self, prompt: str) -> str:
"""获取模拟响应"""
# 模拟分析结果
if '市场分析任务' in prompt:
return json.dumps({
'trend_judgment': '震荡偏多',
'win_rate_assessment': '65%',
'risk_warning': '短期波动较大,注意止损',
'trade_recommendation': '轻仓做多',
'analysis_logic': '基于MACD金叉、RSI中性偏多、双均线金叉等信号综合判断震荡偏多趋势'
}, ensure_ascii=False)
# 模拟建议结果
elif '交易建议生成任务' in prompt:
return json.dumps({
'direction': 'long',
'entry_price': 3500,
'stop_loss': 3450,
'target_price': 3600,
'position_size': 2,
'execution_plan': '回调至3480附近入场止损设置在3450目标位3600突破3600后可加仓',
'risk_tips': '若跌破3450立即止损若成交量萎缩考虑提前出场'
}, ensure_ascii=False)
else:
return json.dumps({
'error': '未知任务类型'
}, ensure_ascii=False)
def _parse_analysis_result(self, response: str) -> Dict:
"""解析分析结果"""
try:
# 尝试直接解析JSON
return json.loads(response)
except json.JSONDecodeError:
# 如果不是纯JSON提取JSON部分
import re
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
return {'error': '解析失败'}
else:
return {'error': '无有效JSON'}
def _parse_recommendation_result(self, response: str) -> Dict:
"""解析建议结果"""
try:
# 尝试直接解析JSON
return json.loads(response)
except json.JSONDecodeError:
# 如果不是纯JSON提取JSON部分
import re
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
return {'error': '解析失败'}
else:
return {'error': '无有效JSON'}
def fuse_multidimensional_data(self, data_sources: List[Dict]) -> Dict:
"""融合多维度数据"""
# 构建融合提示词
prompt = f"""# 多维度数据融合任务
请将以下多个数据源的数据进行融合分析提取关键信息形成综合的市场判断
## 数据源
{json.dumps(data_sources, ensure_ascii=False, indent=2)}
## 融合要求
1. **数据一致性检查**检查各数据源之间的一致性
2. **关键信息提取**提取各数据源的关键信息
3. **综合判断形成**基于融合数据形成综合市场判断
4. **不确定性评估**评估数据的不确定性和风险
请以JSON格式输出融合结果包含以下字段
- fused_data融合后的数据
- key_insights关键洞察
- comprehensive_judgment综合判断
- uncertainty_assessment不确定性评估
"""
# 调用API
response = self._call_deepseek_api(prompt)
# 解析结果
fused_result = self._parse_analysis_result(response)
return fused_result
def generate_market_insights(self, historical_data: List[Dict], current_data: Dict) -> Dict:
"""生成市场洞察"""
# 构建提示词
prompt = f"""# 市场洞察生成任务
基于以下历史数据和当前数据生成深度的市场洞察
## 历史数据
{json.dumps(historical_data, ensure_ascii=False, indent=2)}
## 当前数据
{json.dumps(current_data, ensure_ascii=False, indent=2)}
## 洞察要求
1. **趋势变化分析**分析市场趋势的变化
2. **关键转折点识别**识别重要的市场转折点
3. **异常情况检测**检测异常的市场行为
4. **未来走势预测**基于历史和当前数据预测未来走势
5. **投资机会挖掘**挖掘潜在的投资机会
请以JSON格式输出市场洞察包含以下字段
- trend_analysis趋势分析
- turning_points转折点分析
- anomalies异常检测
- future_prediction未来预测
- investment_opportunities投资机会
"""
# 调用API
response = self._call_deepseek_api(prompt)
# 解析结果
insights = self._parse_analysis_result(response)
return insights