You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

978 lines
39 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# Flask web 应用
from flask import Flask, render_template, jsonify, request, redirect, url_for, flash, session
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 导入认证模块
from auth import login_manager, init_db, register_user, login_user_by_credentials, logout_user, login_required, current_user
from qihuo_analyzer.data.data_fetcher import DataFetcher
from qihuo_analyzer.data.data_storage import DataStorage
from qihuo_analyzer.modules.trend_filter import TrendFilter
from qihuo_analyzer.modules.risk_manager import RiskManager
from qihuo_analyzer.modules.fund_flow_monitor import FundFlowMonitor
from qihuo_analyzer.modules.support_resistance import SupportResistance
from qihuo_analyzer.modules.rollover_detector import RolloverDetector
from qihuo_analyzer.modules.deepseek_agent import DeepseekAgent
from qihuo_analyzer.core.models import AnalysisResult
app = Flask(__name__)
# 设置Flask-Login
app.secret_key = 'your-secret-key-here' # 实际项目中应该使用环境变量
login_manager.init_app(app)
login_manager.login_view = 'login' # 设置登录页面的路由
# 初始化数据库
init_db()
# 模板上下文处理器
@app.context_processor
def inject_functions():
import datetime
def now():
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return {'now': now}
# 登录路由
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
success, message = login_user_by_credentials(username, password)
if success:
flash(message)
return redirect(url_for('index'))
else:
flash(message)
return render_template('login.html')
# 注册路由
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
success, message = register_user(username, password)
if success:
flash(message)
return redirect(url_for('login'))
else:
flash(message)
return render_template('register.html')
# 登出路由
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('已登出')
return redirect(url_for('login'))
# 初始化组件
data_fetcher = DataFetcher()
data_storage = DataStorage()
trend_filter = TrendFilter()
risk_manager = RiskManager()
fund_flow_monitor = FundFlowMonitor()
support_resistance = SupportResistance()
rollover_detector = RolloverDetector()
deepseek_agent = DeepseekAgent()
# 连接API
data_fetcher.connect()
# 从SDK获取所有品种列表
test_symbols = data_fetcher.get_all_symbols()
# 自选品种列表(使用内存存储,实际项目中可以使用数据库或文件存储)
selected_symbols = []
# 热门品种获取功能
def get_hot_symbols(all_symbols_data):
"""获取热门交易品种
Args:
all_symbols_data: 所有品种的分析数据
Returns:
dict: 包含成交量、振幅、涨速三个模块的热门品种数据
"""
hot_symbols = {
'volume': [], # 成交量热门
'amplitude': [], # 振幅热门
'speed': [] # 涨速热门
}
# 计算每个品种的成交量、振幅、涨速
for symbol_data in all_symbols_data:
try:
# 这里简化处理,实际项目中应该使用真实的成交量、振幅、涨速数据
# 假设我们从kline_data中获取这些数据
# 由于我们没有这些数据,这里使用基于品种代码的哈希值生成值,确保每次计算结果一致
import hashlib
# 使用品种代码生成哈希值
hash_obj = hashlib.md5(symbol_data['symbol'].encode())
hash_int = int(hash_obj.hexdigest(), 16)
# 基于哈希值生成成交量、振幅、涨速
symbol_data['volume'] = 100000 + (hash_int % 900000) # 100000-1000000
symbol_data['amplitude'] = 0.1 + (hash_int % 100) / 25 # 0.1-4.1
symbol_data['speed'] = -2.0 + (hash_int % 100) / 25 # -2.0-2.0
except Exception:
pass
# 按成交量排序取前5个
hot_symbols['volume'] = sorted(all_symbols_data, key=lambda x: x.get('volume', 0), reverse=True)[:5]
# 按振幅排序取前5个
hot_symbols['amplitude'] = sorted(all_symbols_data, key=lambda x: x.get('amplitude', 0), reverse=True)[:5]
# 按涨速排序取前5个
hot_symbols['speed'] = sorted(all_symbols_data, key=lambda x: x.get('speed', 0), reverse=True)[:5]
return hot_symbols
@app.route('/')
def index():
"""首页 - 多品种分析面板"""
# 获取所有品种的分析数据
all_symbols_data = []
data_available = False
for symbol in test_symbols:
try:
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data is None or kline_data.empty:
continue
data_available = True
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# 价格数据
current_price = kline_data['close'].iloc[-1]
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
# 转换周期为中文
def cycle_to_cn(cycle):
cycle_map = {
'short': '短期',
'medium': '中期',
'long': '长期',
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return cycle_map.get(cycle, cycle)
# 转换资金流向为中文
def fund_flow_to_cn(fund_flow):
fund_flow_map = {
'bullish': '多头',
'bearish': '空头',
'neutral': '中性',
'Strong_bullish': '强多头',
'Strong_bearish': '强空头'
}
return fund_flow_map.get(fund_flow, fund_flow)
# 构建数据
# 安全处理可能的NaN值
safe_current_price = 0
try:
if current_price is not None and (not isinstance(current_price, float) or current_price == current_price): # 不是NaN
safe_current_price = round(current_price, 2)
except (ValueError, TypeError):
safe_current_price = 0
safe_win_rate = 0
try:
if win_rate is not None and (not isinstance(win_rate, float) or win_rate == win_rate): # 不是NaN
safe_win_rate = round(win_rate, 1)
except (ValueError, TypeError):
safe_win_rate = 0
# 安全获取ADX值
safe_adx = 0
try:
adx = trend_analysis.get('adx', 0)
if adx is not None and (not isinstance(adx, float) or adx == adx): # 不是NaN
safe_adx = adx
except (ValueError, TypeError):
safe_adx = 0
symbol_data = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_current_price,
'direction': trend_analysis.get('overall_trend', 'sideways'),
'trend_strength': _get_trend_strength_display(safe_adx)
}
all_symbols_data.append(symbol_data)
except Exception as e:
print(f"分析 {symbol} 失败: {e}")
continue
# 获取自选品种的分析数据
selected_symbols_data = []
for symbol in selected_symbols:
try:
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data is None or kline_data.empty:
continue
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# 价格数据
current_price = kline_data['close'].iloc[-1]
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
# 转换周期为中文
def cycle_to_cn(cycle):
cycle_map = {
'short': '短期',
'medium': '中期',
'long': '长期',
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return cycle_map.get(cycle, cycle)
# 转换资金流向为中文
def fund_flow_to_cn(fund_flow):
fund_flow_map = {
'bullish': '多头',
'bearish': '空头',
'neutral': '中性',
'Strong_bullish': '强多头',
'Strong_bearish': '强空头'
}
return fund_flow_map.get(fund_flow, fund_flow)
# 构建数据
# 安全处理可能的NaN值
safe_current_price = 0
try:
if current_price is not None and (not isinstance(current_price, float) or current_price == current_price): # 不是NaN
safe_current_price = round(current_price, 2)
except (ValueError, TypeError):
safe_current_price = 0
safe_win_rate = 0
try:
if win_rate is not None and (not isinstance(win_rate, float) or win_rate == win_rate): # 不是NaN
safe_win_rate = round(win_rate, 1)
except (ValueError, TypeError):
safe_win_rate = 0
# 安全获取ADX值
safe_adx = 0
try:
adx = trend_analysis.get('adx', 0)
if adx is not None and (not isinstance(adx, float) or adx == adx): # 不是NaN
safe_adx = adx
except (ValueError, TypeError):
safe_adx = 0
# 计算胜率(模拟数据,实际项目中应该使用真实的胜率计算)
win_rate = round(win_rate, 1) if win_rate is not None else 0
# 计算换月预警(模拟数据,实际项目中应该使用真实的距离交割天数)
rollover_warning = _get_rollover_warning(rollover_analysis)
# 计算主力资金流向(模拟数据,实际项目中应该使用真实的资金流向数据)
fund_flow = fund_flow_to_cn(fund_flow_analysis.get('fund_signal', 'neutral'))
# 计算周期(模拟数据,实际项目中应该使用真实的周期判断)
cycle = cycle_to_cn(cycle)
symbol_data = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_current_price,
'direction': trend_analysis.get('overall_trend', 'sideways'),
'win_rate': win_rate,
'trend_strength': _get_trend_strength_display(safe_adx),
'cycle': cycle,
'rollover_warning': rollover_warning,
'fund_flow': fund_flow
}
selected_symbols_data.append(symbol_data)
except Exception as e:
print(f"分析自选品种 {symbol} 失败: {e}")
continue
# 获取热门品种数据
hot_symbols = get_hot_symbols(all_symbols_data)
# 如果没有任何数据可用,显示友好提示
if not data_available:
return render_template('index.html',
all_symbols_data=[],
selected_symbols_data=[],
hot_symbols={},
data_unavailable=True,
message="无法获取真实市场数据请检查网络连接和TQSDK账号状态")
return render_template('index.html',
all_symbols_data=all_symbols_data,
selected_symbols_data=selected_symbols_data,
hot_symbols=hot_symbols,
data_unavailable=False)
@app.route('/symbol/<symbol>')
def symbol_detail(symbol):
"""品种详情页"""
try:
# 获取模型选择参数默认为deepseek
model_name = request.args.get('model', 'deepseek')
# 辅助函数:安全地四舍五入数字
def safe_round(value, decimals=2, context=''):
try:
if context:
print(f"[DEBUG] safe_round called with value: {value}, type: {type(value)}, context: {context}")
if value is None:
print(f"[DEBUG] safe_round returning 0 for None value, context: {context}")
return 0
# 检查是否为NaN
if isinstance(value, float) and value != value: # NaN检查
print(f"[DEBUG] safe_round returning 0 for NaN value, context: {context}")
return 0
# 检查是否为undefined
if value == 'undefined' or value == 'nan' or value == 'None':
print(f"[DEBUG] safe_round returning 0 for undefined-like value, context: {context}")
return 0
result = round(float(value), decimals)
if context:
print(f"[DEBUG] safe_round returning {result} for value: {value}, context: {context}")
return result
except (ValueError, TypeError) as e:
print(f"[DEBUG] safe_round exception: {e}, value: {value}, type: {type(value)}, context: {context}")
return 0
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data is None or kline_data.empty:
return render_template('error.html', message="获取数据失败")
# 基础数据
try:
current_price = kline_data['close'].iloc[-1]
# 检查current_price是否为有效数字
if current_price is None or (isinstance(current_price, float) and current_price != current_price): # NaN检查
return render_template('error.html', message="获取数据失败:价格数据无效")
except (ValueError, TypeError, IndexError):
return render_template('error.html', message="获取数据失败:价格数据无效")
try:
price_change = kline_data['close'].iloc[-1] - kline_data['close'].iloc[-2]
price_change_pct = (price_change / kline_data['close'].iloc[-2]) * 100
except (ValueError, TypeError, IndexError):
price_change = 0
price_change_pct = 0
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 压力支撑分析
sr_analysis = support_resistance.analyze_support_resistance(kline_data)
support_levels = sr_analysis['support_resistance_levels']['support_levels']
resistance_levels = sr_analysis['support_resistance_levels']['resistance_levels']
# 风险分析
atr = trend_analysis.get('atr', 20)
stop_loss_long = risk_manager.calculate_stop_loss(kline_data, current_price, "long")
stop_loss_short = risk_manager.calculate_stop_loss(kline_data, current_price, "short")
# 仓位计算
account_balance = 1000000
position_info = risk_manager.calculate_position_size(account_balance, kline_data, "long", current_price)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# AI 分析
market_data = {
'symbol': symbol,
'latest_price': current_price,
'volume': kline_data['volume'].iloc[-1],
'open_interest': kline_data['open_interest'].iloc[-1],
'timeframe': '1d'
}
technical_indicators = {
'macd': {'signal': '金叉'},
'rsi': 55,
'bollinger': {'position': '中轨附近'},
'kdj': {'signal': '金叉'},
'atr': atr
}
# 安全获取趋势分析数据处理可能的NaN值
def safe_get_trend_value(key, default=0):
value = trend_analysis.get(key, default)
try:
if value is None:
return default
if isinstance(value, float) and value != value: # NaN检查
return default
return value
except (ValueError, TypeError):
return default
# 打印趋势分析数据
print(f"[DEBUG] trend_analysis: {trend_analysis}")
print(f"[DEBUG] win_rate: {win_rate}, type: {type(win_rate)}")
print(f"[DEBUG] atr: {atr}, type: {type(atr)}")
print(f"[DEBUG] stop_loss_long: {stop_loss_long}, type: {type(stop_loss_long)}")
print(f"[DEBUG] stop_loss_short: {stop_loss_short}, type: {type(stop_loss_short)}")
print(f"[DEBUG] position_info: {position_info}")
trend_data = {
'adx': safe_get_trend_value('adx'),
'trend_strength': safe_get_trend_value('trend_strength', 'none'),
'trend_direction': safe_get_trend_value('trend_direction', 'neutral'),
'ma_relationship': safe_get_trend_value('ma_relationship', 'neutral'),
'overall_trend': safe_get_trend_value('overall_trend', 'neutral'),
'win_rate': safe_round(win_rate, 1, context='trend_data_win_rate') if win_rate is not None else 0
}
# 安全获取止损和目标价格
safe_stop_loss = safe_round(stop_loss_long, 2, context='safe_stop_loss') if stop_loss_long is not None else 0
safe_target_price = 0
if resistance_levels:
try:
target = resistance_levels[0]
print(f"[DEBUG] resistance_levels[0]: {target}, type: {type(target)}")
if target is not None and (not isinstance(target, float) or target == target): # 不是NaN
safe_target_price = safe_round(target, 2, context='safe_target_price_from_resistance')
else:
safe_target_price = safe_round(current_price * 1.05, 2, context='safe_target_price_from_current')
except (ValueError, TypeError, IndexError) as e:
print(f"[DEBUG] Error getting target price: {e}")
safe_target_price = safe_round(current_price * 1.05, 2, context='safe_target_price_default')
else:
safe_target_price = safe_round(current_price * 1.05, 2, context='safe_target_price_no_resistance')
risk_metrics = {
'stop_loss': safe_stop_loss,
'target_price': safe_target_price,
'profit_loss_ratio': 1.8,
'position_size': position_info.get('suggested_units', 0),
'risk_ratio': safe_round(position_info.get('actual_risk_percent', 0) * 100, 2, context='risk_metrics_risk_ratio')
}
# AI 分析 - 使用指定的模型
ai_agent = DeepseekAgent(model_name=model_name)
ai_analysis = ai_agent.analyze_market(market_data, technical_indicators, trend_data, risk_metrics)
recommendation = ai_agent.generate_trade_recommendation(ai_analysis, market_data)
# 构建模板数据
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
print(f"[DEBUG] product_name_cn: {product_name_cn}")
# 转换周期为中文
def cycle_to_cn(cycle):
cycle_map = {
'short': '短期',
'medium': '中期',
'long': '长期',
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return cycle_map.get(cycle, cycle)
# 转换趋势方向为中文
def direction_to_cn(direction):
direction_map = {
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return direction_map.get(direction, direction)
# 转换资金流向为中文
def fund_flow_to_cn(fund_flow):
fund_flow_map = {
'bullish': '多头',
'bearish': '空头',
'neutral': '中性',
'Strong_bullish': '强多头',
'Strong_bearish': '强空头',
'strong_increasing': '强劲增加',
'increasing': '增加',
'strong_decreasing': '强劲减少',
'decreasing': '减少',
'stable': '稳定',
'price_up_oi_up': '价涨量增',
'price_up_oi_down': '价涨量减',
'price_down_oi_up': '价跌量增',
'price_down_oi_down': '价跌量减',
'bullish_divergence': '看涨背离',
'bearish_divergence': '看跌背离',
'no_divergence': '无背离'
}
return fund_flow_map.get(fund_flow, fund_flow)
print(f"[DEBUG] current_price: {current_price}, type: {type(current_price)}")
print(f"[DEBUG] price_change: {price_change}, type: {type(price_change)}")
print(f"[DEBUG] price_change_pct: {price_change_pct}, type: {type(price_change_pct)}")
print(f"[DEBUG] cycle: {cycle}, type: {type(cycle)}")
context = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_round(current_price, 2, context='context_current_price'),
'price_change': safe_round(price_change, 2, context='context_price_change'),
'price_change_pct': safe_round(price_change_pct, 2, context='context_price_change_pct'),
'trend_analysis': trend_analysis,
'win_rate': safe_round(win_rate, 1, context='context_win_rate'),
'cycle': cycle_to_cn(cycle),
'fund_flow_analysis': fund_flow_analysis,
'sr_analysis': sr_analysis,
'support_levels': support_levels,
'resistance_levels': resistance_levels,
'risk_analysis': {
'atr': safe_round(atr, 2, context='risk_analysis_atr'),
'stop_loss_long': safe_round(stop_loss_long, 2, context='risk_analysis_stop_loss_long'),
'stop_loss_short': safe_round(stop_loss_short, 2, context='risk_analysis_stop_loss_short'),
'position_size': position_info.get('suggested_units', 0),
'risk_ratio': safe_round(position_info.get('actual_risk_percent', 0) * 100, 2, context='risk_analysis_risk_ratio'),
'leverage': safe_round(position_info.get('leverage', 0), 2, context='risk_analysis_leverage')
},
'rollover_analysis': rollover_analysis,
'ai_analysis': ai_analysis,
'recommendation': recommendation,
'kline_data': _get_kline_data_for_chart(kline_data),
'is_favorite': symbol in selected_symbols,
'model_name': model_name,
'available_models': ['deepseek', 'gpt', 'gemini']
}
print("[DEBUG] context built successfully")
return render_template('symbol_detail.html', **context)
except Exception as e:
print(f"分析 {symbol} 详情失败: {e}")
return render_template('error.html', message=f"分析失败: {str(e)}")
@app.route('/api/analysis/<symbol>')
def api_analysis(symbol):
"""API: 获取品种分析数据"""
try:
# 辅助函数:安全地四舍五入数字
def safe_round(value, decimals=2):
try:
if value is None:
return 0
# 检查是否为NaN
if isinstance(value, float) and value != value: # NaN检查
return 0
return round(float(value), decimals)
except (ValueError, TypeError):
return 0
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data.empty:
return jsonify({"error": "获取数据失败"}), 400
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 压力支撑分析
sr_analysis = support_resistance.analyze_support_resistance(kline_data)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# 价格数据
current_price = kline_data['close'].iloc[-1]
# 构建响应
response = {
'symbol': symbol,
'current_price': safe_round(current_price, 2),
'trend_analysis': trend_analysis,
'win_rate': safe_round(win_rate, 1),
'cycle': cycle,
'fund_flow_analysis': fund_flow_analysis,
'sr_analysis': sr_analysis,
'rollover_analysis': rollover_analysis
}
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/card/<symbol>')
def api_card(symbol):
"""API: 获取分析卡片数据"""
try:
# 辅助函数:安全地四舍五入数字
def safe_round(value, decimals=2):
try:
if value is None:
return 0
# 检查是否为NaN
if isinstance(value, float) and value != value: # NaN检查
return 0
return round(float(value), decimals)
except (ValueError, TypeError):
return 0
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data.empty:
return jsonify({"error": "获取数据失败"}), 400
# 分析数据
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
sr_analysis = support_resistance.analyze_support_resistance(kline_data)
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# AI 分析
current_price = kline_data['close'].iloc[-1]
market_data = {
'symbol': symbol,
'latest_price': current_price,
'volume': kline_data['volume'].iloc[-1],
'open_interest': kline_data['open_interest'].iloc[-1],
'timeframe': '1d'
}
technical_indicators = {
'macd': {'signal': '金叉'},
'rsi': 55,
'bollinger': {'position': '中轨附近'},
'kdj': {'signal': '金叉'},
'atr': trend_analysis.get('atr', 20)
}
# 安全获取趋势分析数据处理可能的NaN值
def safe_get_trend_value(key, default=0):
value = trend_analysis.get(key, default)
try:
if value is None:
return default
if isinstance(value, float) and value != value: # NaN检查
return default
return value
except (ValueError, TypeError):
return default
trend_data = {
'adx': safe_get_trend_value('adx'),
'trend_strength': safe_get_trend_value('trend_strength', 'none'),
'trend_direction': safe_get_trend_value('trend_direction', 'neutral'),
'ma_relationship': safe_get_trend_value('ma_relationship', 'neutral'),
'overall_trend': safe_get_trend_value('overall_trend', 'neutral'),
'win_rate': safe_round(win_rate, 1) if win_rate is not None else 0
}
# 安全获取止损和目标价格
safe_stop_loss = 0
try:
stop_loss = risk_manager.calculate_stop_loss(kline_data, current_price, "long")
safe_stop_loss = safe_round(stop_loss, 2) if stop_loss is not None else 0
except Exception:
safe_stop_loss = 0
safe_target_price = 0
try:
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
if resistance_levels:
target = resistance_levels[0]
if target is not None and (not isinstance(target, float) or target == target): # 不是NaN
safe_target_price = safe_round(target, 2)
else:
safe_target_price = safe_round(current_price * 1.05, 2)
else:
safe_target_price = safe_round(current_price * 1.05, 2)
except Exception:
safe_target_price = safe_round(current_price * 1.05, 2)
risk_metrics = {
'stop_loss': safe_stop_loss,
'target_price': safe_target_price,
'profit_loss_ratio': 1.8,
'position_size': 2,
'risk_ratio': 2.5
}
ai_analysis = deepseek_agent.analyze_market(market_data, technical_indicators, trend_data, risk_metrics)
recommendation = deepseek_agent.generate_trade_recommendation(ai_analysis, market_data)
# 构建卡片数据
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
# 转换趋势方向为中文
def direction_to_cn(direction):
direction_map = {
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return direction_map.get(direction, direction)
card_data = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_round(current_price, 2),
'direction': direction_to_cn(trend_analysis.get('overall_trend', 'sideways')),
'win_rate': safe_round(win_rate, 1),
'recommendation': recommendation,
'technical_indicators': technical_indicators,
'fund_flow': fund_flow_analysis,
'rollover_warning': rollover_analysis
}
return jsonify(card_data)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/card/<symbol>')
def card(symbol):
"""分析卡片页面"""
try:
# 获取卡片数据
import requests
response = requests.get(f"http://localhost:5000/api/card/{symbol}")
card_data = response.json()
if 'error' in card_data:
return render_template('error.html', message=card_data['error'])
return render_template('card.html', **card_data)
except Exception as e:
print(f"获取卡片数据失败: {e}")
return render_template('error.html', message=f"获取卡片数据失败: {str(e)}")
def _get_trend_strength_display(adx):
"""获取趋势强度显示文本"""
try:
adx_value = float(adx) if adx is not None else 0
if adx_value > 40:
return f"强势多头({round(adx_value, 1)})"
elif adx_value > 25:
return f"多头({round(adx_value, 1)})"
elif adx_value > 20:
return f"弱势多头({round(adx_value, 1)})"
else:
return f"震荡({round(adx_value, 1)})"
except (ValueError, TypeError):
return "震荡(0.0)"
def _get_rollover_warning(rollover_analysis):
"""获取换月预警信息"""
try:
days = rollover_analysis.get('days_to_delivery', 0)
level = rollover_analysis.get('warning_level', 'low')
try:
days_value = int(days) if days is not None else 0
except (ValueError, TypeError):
days_value = 0
if level == 'critical':
return f"⚠️ {abs(days_value)}"
elif level == 'high':
return f"⚠️ {days_value}"
elif level == 'medium':
return f"⚠️ {days_value}"
else:
return f"{days_value}"
except (ValueError, TypeError):
return "✅ 0 天"
def _get_kline_data_for_chart(kline_data):
"""获取图表用的K线数据"""
try:
chart_data = []
if kline_data is not None and not kline_data.empty:
for idx, row in kline_data.tail(30).iterrows():
try:
chart_data.append({
'date': idx.strftime('%Y-%m-%d') if hasattr(idx, 'strftime') else '2023-01-01',
'open': float(row.get('open', 0)),
'high': float(row.get('high', 0)),
'low': float(row.get('low', 0)),
'close': float(row.get('close', 0)),
'volume': float(row.get('volume', 0))
})
except (ValueError, TypeError):
continue
return chart_data
except (ValueError, TypeError):
return []
@app.route('/api/selected/add/<symbol>', methods=['POST'])
def add_selected_symbol(symbol):
"""添加自选品种"""
global selected_symbols
if symbol not in selected_symbols:
selected_symbols.append(symbol)
return jsonify({'status': 'success', 'selected_symbols': selected_symbols})
@app.route('/api/selected/remove/<symbol>', methods=['POST'])
def remove_selected_symbol(symbol):
"""删除自选品种"""
global selected_symbols
if symbol in selected_symbols:
selected_symbols.remove(symbol)
return jsonify({'status': 'success', 'selected_symbols': selected_symbols})
@app.route('/api/selected/list')
def get_selected_symbols():
"""获取自选品种列表"""
return jsonify({'selected_symbols': selected_symbols})
@app.route('/config', methods=['GET', 'POST'])
def config():
"""配置页面"""
if request.method == 'POST':
# 保存配置
try:
# 读取表单数据
data_adapter_type = request.form.get('data_adapter_type', 'tqsdk')
tqsdk_username = request.form.get('tqsdk_username', '')
tqsdk_password = request.form.get('tqsdk_password', '')
rqdata_username = request.form.get('rqdata_username', '')
rqdata_password = request.form.get('rqdata_password', '')
debug = request.form.get('debug', 'false') == 'true'
enable_cache = request.form.get('enable_cache', 'true') == 'true'
cache_expiry = request.form.get('cache_expiry', '3600')
default_ai_model = request.form.get('default_ai_model', 'deepseek')
ai_api_key = request.form.get('ai_api_key', '')
# 写入配置文件
config_content = f"""# 数据源配置文件
# 统一数据获取接口的配置参数
# 数据源类型配置
# 可选值: tqsdk, rqdata
DATA_ADAPTER_TYPE = "{data_adapter_type}"
# TQSDK账号配置可选
# 未配置时会使用模拟数据
TQSDK_USERNAME = "{tqsdk_username}"
TQSDK_PASSWORD = "{tqsdk_password}"
# RQData账号配置可选
# 未配置时会使用模拟数据
RQDATA_USERNAME = "{rqdata_username}"
RQDATA_PASSWORD = "{rqdata_password}"
# 调试模式配置
# 设置为True时启用详细日志
DEBUG = {debug}
# 数据源连接超时设置(秒)
CONNECTION_TIMEOUT = 30
# 数据缓存配置
# 设置为True时启用数据缓存提高重复查询性能
ENABLE_CACHE = {enable_cache}
# 缓存过期时间(秒)
CACHE_EXPIRY = {cache_expiry}
# AI模型配置
# 默认AI模型
DEFAULT_AI_MODEL = "{default_ai_model}"
# AI模型API Key
AI_API_KEY = "{ai_api_key}"
"""
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config.py'), 'w', encoding='utf-8') as f:
f.write(config_content)
flash('配置保存成功')
return redirect(url_for('config'))
except Exception as e:
flash(f'配置保存失败: {str(e)}')
# 读取配置
try:
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
config_data = {
'data_adapter_type': getattr(config, 'DATA_ADAPTER_TYPE', 'tqsdk'),
'tqsdk_username': getattr(config, 'TQSDK_USERNAME', ''),
'tqsdk_password': getattr(config, 'TQSDK_PASSWORD', ''),
'rqdata_username': getattr(config, 'RQDATA_USERNAME', ''),
'rqdata_password': getattr(config, 'RQDATA_PASSWORD', ''),
'debug': getattr(config, 'DEBUG', False),
'enable_cache': getattr(config, 'ENABLE_CACHE', True),
'cache_expiry': getattr(config, 'CACHE_EXPIRY', 3600),
'default_ai_model': getattr(config, 'DEFAULT_AI_MODEL', 'deepseek'),
'ai_api_key': getattr(config, 'AI_API_KEY', '')
}
except Exception as e:
flash(f'读取配置失败: {str(e)}')
config_data = {
'data_adapter_type': 'tqsdk',
'tqsdk_username': '',
'tqsdk_password': '',
'rqdata_username': '',
'rqdata_password': '',
'debug': False,
'enable_cache': True,
'cache_expiry': 3600,
'default_ai_model': 'deepseek',
'ai_api_key': ''
}
return render_template('config.html', **config_data)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)