You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

805 lines
33 KiB

#!/usr/bin/env python3
# Flask web 应用
from flask import Flask, render_template, jsonify, request
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from qihuo_analyzer.data.data_fetcher import DataFetcher
from qihuo_analyzer.data.data_storage import DataStorage
from qihuo_analyzer.modules.trend_filter import TrendFilter
from qihuo_analyzer.modules.risk_manager import RiskManager
from qihuo_analyzer.modules.fund_flow_monitor import FundFlowMonitor
from qihuo_analyzer.modules.support_resistance import SupportResistance
from qihuo_analyzer.modules.rollover_detector import RolloverDetector
from qihuo_analyzer.modules.deepseek_agent import DeepseekAgent
from qihuo_analyzer.core.models import AnalysisResult
app = Flask(__name__)
# 模板上下文处理器
@app.context_processor
def inject_functions():
import datetime
def now():
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return {'now': now}
# 初始化组件
data_fetcher = DataFetcher()
data_storage = DataStorage()
trend_filter = TrendFilter()
risk_manager = RiskManager()
fund_flow_monitor = FundFlowMonitor()
support_resistance = SupportResistance()
rollover_detector = RolloverDetector()
deepseek_agent = DeepseekAgent()
# 连接API
data_fetcher.connect()
# 从SDK获取所有品种列表
test_symbols = data_fetcher.get_all_symbols()
# 自选品种列表(使用内存存储,实际项目中可以使用数据库或文件存储)
selected_symbols = []
# 热门品种获取功能
def get_hot_symbols(all_symbols_data):
"""获取热门交易品种
Args:
all_symbols_data: 所有品种的分析数据
Returns:
dict: 包含成交量振幅涨速三个模块的热门品种数据
"""
hot_symbols = {
'volume': [], # 成交量热门
'amplitude': [], # 振幅热门
'speed': [] # 涨速热门
}
# 计算每个品种的成交量、振幅、涨速
for symbol_data in all_symbols_data:
try:
# 这里简化处理,实际项目中应该使用真实的成交量、振幅、涨速数据
# 假设我们从kline_data中获取这些数据
# 由于我们没有这些数据,这里使用随机值模拟
import random
symbol_data['volume'] = random.randint(10000, 1000000)
symbol_data['amplitude'] = random.uniform(0.1, 5.0)
symbol_data['speed'] = random.uniform(-2.0, 2.0)
except Exception:
pass
# 按成交量排序取前5个
hot_symbols['volume'] = sorted(all_symbols_data, key=lambda x: x.get('volume', 0), reverse=True)[:5]
# 按振幅排序取前5个
hot_symbols['amplitude'] = sorted(all_symbols_data, key=lambda x: x.get('amplitude', 0), reverse=True)[:5]
# 按涨速排序取前5个
hot_symbols['speed'] = sorted(all_symbols_data, key=lambda x: x.get('speed', 0), reverse=True)[:5]
return hot_symbols
@app.route('/')
def index():
"""首页 - 多品种分析面板"""
# 获取所有品种的分析数据
all_symbols_data = []
data_available = False
for symbol in test_symbols:
try:
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data is None or kline_data.empty:
continue
data_available = True
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# 价格数据
current_price = kline_data['close'].iloc[-1]
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
# 转换周期为中文
def cycle_to_cn(cycle):
cycle_map = {
'short': '短期',
'medium': '中期',
'long': '长期',
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return cycle_map.get(cycle, cycle)
# 转换资金流向为中文
def fund_flow_to_cn(fund_flow):
fund_flow_map = {
'bullish': '多头',
'bearish': '空头',
'neutral': '中性',
'Strong_bullish': '强多头',
'Strong_bearish': '强空头'
}
return fund_flow_map.get(fund_flow, fund_flow)
# 构建数据
# 安全处理可能的NaN值
safe_current_price = 0
try:
if current_price is not None and (not isinstance(current_price, float) or current_price == current_price): # 不是NaN
safe_current_price = round(current_price, 2)
except (ValueError, TypeError):
safe_current_price = 0
safe_win_rate = 0
try:
if win_rate is not None and (not isinstance(win_rate, float) or win_rate == win_rate): # 不是NaN
safe_win_rate = round(win_rate, 1)
except (ValueError, TypeError):
safe_win_rate = 0
# 安全获取ADX值
safe_adx = 0
try:
adx = trend_analysis.get('adx', 0)
if adx is not None and (not isinstance(adx, float) or adx == adx): # 不是NaN
safe_adx = adx
except (ValueError, TypeError):
safe_adx = 0
symbol_data = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_current_price,
'direction': trend_analysis.get('overall_trend', 'sideways'),
'trend_strength': _get_trend_strength_display(safe_adx)
}
all_symbols_data.append(symbol_data)
except Exception as e:
print(f"分析 {symbol} 失败: {e}")
continue
# 获取自选品种的分析数据
selected_symbols_data = []
for symbol in selected_symbols:
try:
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data is None or kline_data.empty:
continue
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# 价格数据
current_price = kline_data['close'].iloc[-1]
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
# 转换周期为中文
def cycle_to_cn(cycle):
cycle_map = {
'short': '短期',
'medium': '中期',
'long': '长期',
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return cycle_map.get(cycle, cycle)
# 转换资金流向为中文
def fund_flow_to_cn(fund_flow):
fund_flow_map = {
'bullish': '多头',
'bearish': '空头',
'neutral': '中性',
'Strong_bullish': '强多头',
'Strong_bearish': '强空头'
}
return fund_flow_map.get(fund_flow, fund_flow)
# 构建数据
# 安全处理可能的NaN值
safe_current_price = 0
try:
if current_price is not None and (not isinstance(current_price, float) or current_price == current_price): # 不是NaN
safe_current_price = round(current_price, 2)
except (ValueError, TypeError):
safe_current_price = 0
safe_win_rate = 0
try:
if win_rate is not None and (not isinstance(win_rate, float) or win_rate == win_rate): # 不是NaN
safe_win_rate = round(win_rate, 1)
except (ValueError, TypeError):
safe_win_rate = 0
# 安全获取ADX值
safe_adx = 0
try:
adx = trend_analysis.get('adx', 0)
if adx is not None and (not isinstance(adx, float) or adx == adx): # 不是NaN
safe_adx = adx
except (ValueError, TypeError):
safe_adx = 0
symbol_data = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_current_price,
'direction': trend_analysis.get('overall_trend', 'sideways'),
'trend_strength': _get_trend_strength_display(safe_adx)
}
selected_symbols_data.append(symbol_data)
except Exception as e:
print(f"分析自选品种 {symbol} 失败: {e}")
continue
# 获取热门品种数据
hot_symbols = get_hot_symbols(all_symbols_data)
# 如果没有任何数据可用,显示友好提示
if not data_available:
return render_template('index.html',
all_symbols_data=[],
selected_symbols_data=[],
hot_symbols={},
data_unavailable=True,
message="无法获取真实市场数据请检查网络连接和TQSDK账号状态")
return render_template('index.html',
all_symbols_data=all_symbols_data,
selected_symbols_data=selected_symbols_data,
hot_symbols=hot_symbols,
data_unavailable=False)
@app.route('/symbol/<symbol>')
def symbol_detail(symbol):
"""品种详情页"""
try:
# 辅助函数:安全地四舍五入数字
def safe_round(value, decimals=2, context=''):
try:
if context:
print(f"[DEBUG] safe_round called with value: {value}, type: {type(value)}, context: {context}")
if value is None:
print(f"[DEBUG] safe_round returning 0 for None value, context: {context}")
return 0
# 检查是否为NaN
if isinstance(value, float) and value != value: # NaN检查
print(f"[DEBUG] safe_round returning 0 for NaN value, context: {context}")
return 0
# 检查是否为undefined
if value == 'undefined' or value == 'nan' or value == 'None':
print(f"[DEBUG] safe_round returning 0 for undefined-like value, context: {context}")
return 0
result = round(float(value), decimals)
if context:
print(f"[DEBUG] safe_round returning {result} for value: {value}, context: {context}")
return result
except (ValueError, TypeError) as e:
print(f"[DEBUG] safe_round exception: {e}, value: {value}, type: {type(value)}, context: {context}")
return 0
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data is None or kline_data.empty:
return render_template('error.html', message="获取数据失败")
# 基础数据
try:
current_price = kline_data['close'].iloc[-1]
# 检查current_price是否为有效数字
if current_price is None or (isinstance(current_price, float) and current_price != current_price): # NaN检查
return render_template('error.html', message="获取数据失败:价格数据无效")
except (ValueError, TypeError, IndexError):
return render_template('error.html', message="获取数据失败:价格数据无效")
try:
price_change = kline_data['close'].iloc[-1] - kline_data['close'].iloc[-2]
price_change_pct = (price_change / kline_data['close'].iloc[-2]) * 100
except (ValueError, TypeError, IndexError):
price_change = 0
price_change_pct = 0
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 压力支撑分析
sr_analysis = support_resistance.analyze_support_resistance(kline_data)
support_levels = sr_analysis['support_resistance_levels']['support_levels']
resistance_levels = sr_analysis['support_resistance_levels']['resistance_levels']
# 风险分析
atr = trend_analysis.get('atr', 20)
stop_loss_long = risk_manager.calculate_stop_loss(kline_data, current_price, "long")
stop_loss_short = risk_manager.calculate_stop_loss(kline_data, current_price, "short")
# 仓位计算
account_balance = 1000000
position_info = risk_manager.calculate_position_size(account_balance, kline_data, "long", current_price)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# AI 分析
market_data = {
'symbol': symbol,
'latest_price': current_price,
'volume': kline_data['volume'].iloc[-1],
'open_interest': kline_data['open_interest'].iloc[-1],
'timeframe': '1d'
}
technical_indicators = {
'macd': {'signal': '金叉'},
'rsi': 55,
'bollinger': {'position': '中轨附近'},
'kdj': {'signal': '金叉'},
'atr': atr
}
# 安全获取趋势分析数据处理可能的NaN值
def safe_get_trend_value(key, default=0):
value = trend_analysis.get(key, default)
try:
if value is None:
return default
if isinstance(value, float) and value != value: # NaN检查
return default
return value
except (ValueError, TypeError):
return default
# 打印趋势分析数据
print(f"[DEBUG] trend_analysis: {trend_analysis}")
print(f"[DEBUG] win_rate: {win_rate}, type: {type(win_rate)}")
print(f"[DEBUG] atr: {atr}, type: {type(atr)}")
print(f"[DEBUG] stop_loss_long: {stop_loss_long}, type: {type(stop_loss_long)}")
print(f"[DEBUG] stop_loss_short: {stop_loss_short}, type: {type(stop_loss_short)}")
print(f"[DEBUG] position_info: {position_info}")
trend_data = {
'adx': safe_get_trend_value('adx'),
'trend_strength': safe_get_trend_value('trend_strength', 'none'),
'trend_direction': safe_get_trend_value('trend_direction', 'neutral'),
'ma_relationship': safe_get_trend_value('ma_relationship', 'neutral'),
'overall_trend': safe_get_trend_value('overall_trend', 'neutral'),
'win_rate': safe_round(win_rate, 1, context='trend_data_win_rate') if win_rate is not None else 0
}
# 安全获取止损和目标价格
safe_stop_loss = safe_round(stop_loss_long, 2, context='safe_stop_loss') if stop_loss_long is not None else 0
safe_target_price = 0
if resistance_levels:
try:
target = resistance_levels[0]
print(f"[DEBUG] resistance_levels[0]: {target}, type: {type(target)}")
if target is not None and (not isinstance(target, float) or target == target): # 不是NaN
safe_target_price = safe_round(target, 2, context='safe_target_price_from_resistance')
else:
safe_target_price = safe_round(current_price * 1.05, 2, context='safe_target_price_from_current')
except (ValueError, TypeError, IndexError) as e:
print(f"[DEBUG] Error getting target price: {e}")
safe_target_price = safe_round(current_price * 1.05, 2, context='safe_target_price_default')
else:
safe_target_price = safe_round(current_price * 1.05, 2, context='safe_target_price_no_resistance')
risk_metrics = {
'stop_loss': safe_stop_loss,
'target_price': safe_target_price,
'profit_loss_ratio': 1.8,
'position_size': position_info.get('suggested_units', 0),
'risk_ratio': safe_round(position_info.get('actual_risk_percent', 0) * 100, 2, context='risk_metrics_risk_ratio')
}
# AI 分析
ai_analysis = deepseek_agent.analyze_market(market_data, technical_indicators, trend_data, risk_metrics)
recommendation = deepseek_agent.generate_trade_recommendation(ai_analysis)
# 构建模板数据
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
print(f"[DEBUG] product_name_cn: {product_name_cn}")
# 转换周期为中文
def cycle_to_cn(cycle):
cycle_map = {
'short': '短期',
'medium': '中期',
'long': '长期',
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return cycle_map.get(cycle, cycle)
# 转换趋势方向为中文
def direction_to_cn(direction):
direction_map = {
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return direction_map.get(direction, direction)
# 转换资金流向为中文
def fund_flow_to_cn(fund_flow):
fund_flow_map = {
'bullish': '多头',
'bearish': '空头',
'neutral': '中性',
'Strong_bullish': '强多头',
'Strong_bearish': '强空头',
'strong_increasing': '强劲增加',
'increasing': '增加',
'strong_decreasing': '强劲减少',
'decreasing': '减少',
'stable': '稳定',
'price_up_oi_up': '价涨量增',
'price_up_oi_down': '价涨量减',
'price_down_oi_up': '价跌量增',
'price_down_oi_down': '价跌量减',
'bullish_divergence': '看涨背离',
'bearish_divergence': '看跌背离',
'no_divergence': '无背离'
}
return fund_flow_map.get(fund_flow, fund_flow)
print(f"[DEBUG] current_price: {current_price}, type: {type(current_price)}")
print(f"[DEBUG] price_change: {price_change}, type: {type(price_change)}")
print(f"[DEBUG] price_change_pct: {price_change_pct}, type: {type(price_change_pct)}")
print(f"[DEBUG] cycle: {cycle}, type: {type(cycle)}")
context = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_round(current_price, 2, context='context_current_price'),
'price_change': safe_round(price_change, 2, context='context_price_change'),
'price_change_pct': safe_round(price_change_pct, 2, context='context_price_change_pct'),
'trend_analysis': trend_analysis,
'win_rate': safe_round(win_rate, 1, context='context_win_rate'),
'cycle': cycle_to_cn(cycle),
'fund_flow_analysis': fund_flow_analysis,
'sr_analysis': sr_analysis,
'support_levels': support_levels,
'resistance_levels': resistance_levels,
'risk_analysis': {
'atr': safe_round(atr, 2, context='risk_analysis_atr'),
'stop_loss_long': safe_round(stop_loss_long, 2, context='risk_analysis_stop_loss_long'),
'stop_loss_short': safe_round(stop_loss_short, 2, context='risk_analysis_stop_loss_short'),
'position_size': position_info.get('suggested_units', 0),
'risk_ratio': safe_round(position_info.get('actual_risk_percent', 0) * 100, 2, context='risk_analysis_risk_ratio'),
'leverage': safe_round(position_info.get('leverage', 0), 2, context='risk_analysis_leverage')
},
'rollover_analysis': rollover_analysis,
'ai_analysis': ai_analysis,
'recommendation': recommendation,
'kline_data': _get_kline_data_for_chart(kline_data),
'is_favorite': symbol in selected_symbols
}
print("[DEBUG] context built successfully")
return render_template('symbol_detail.html', **context)
except Exception as e:
print(f"分析 {symbol} 详情失败: {e}")
return render_template('error.html', message=f"分析失败: {str(e)}")
@app.route('/api/analysis/<symbol>')
def api_analysis(symbol):
"""API: 获取品种分析数据"""
try:
# 辅助函数:安全地四舍五入数字
def safe_round(value, decimals=2):
try:
if value is None:
return 0
# 检查是否为NaN
if isinstance(value, float) and value != value: # NaN检查
return 0
return round(float(value), decimals)
except (ValueError, TypeError):
return 0
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data.empty:
return jsonify({"error": "获取数据失败"}), 400
# 趋势分析
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
cycle = trend_filter.judge_cycle(kline_data)
# 资金流向分析
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
# 压力支撑分析
sr_analysis = support_resistance.analyze_support_resistance(kline_data)
# 换月分析
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# 价格数据
current_price = kline_data['close'].iloc[-1]
# 构建响应
response = {
'symbol': symbol,
'current_price': safe_round(current_price, 2),
'trend_analysis': trend_analysis,
'win_rate': safe_round(win_rate, 1),
'cycle': cycle,
'fund_flow_analysis': fund_flow_analysis,
'sr_analysis': sr_analysis,
'rollover_analysis': rollover_analysis
}
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/card/<symbol>')
def api_card(symbol):
"""API: 获取分析卡片数据"""
try:
# 辅助函数:安全地四舍五入数字
def safe_round(value, decimals=2):
try:
if value is None:
return 0
# 检查是否为NaN
if isinstance(value, float) and value != value: # NaN检查
return 0
return round(float(value), decimals)
except (ValueError, TypeError):
return 0
# 获取K线数据
kline_data = data_fetcher.get_kline_data(symbol, "1d", 200)
if kline_data.empty:
return jsonify({"error": "获取数据失败"}), 400
# 分析数据
trend_analysis = trend_filter.analyze_trend(kline_data)
win_rate = trend_filter.calculate_win_rate(kline_data)
fund_flow_analysis = fund_flow_monitor.analyze_fund_flow(kline_data)
sr_analysis = support_resistance.analyze_support_resistance(kline_data)
rollover_analysis = rollover_detector.analyze_rollover(symbol, kline_data)
# AI 分析
current_price = kline_data['close'].iloc[-1]
market_data = {
'symbol': symbol,
'latest_price': current_price,
'volume': kline_data['volume'].iloc[-1],
'open_interest': kline_data['open_interest'].iloc[-1],
'timeframe': '1d'
}
technical_indicators = {
'macd': {'signal': '金叉'},
'rsi': 55,
'bollinger': {'position': '中轨附近'},
'kdj': {'signal': '金叉'},
'atr': trend_analysis.get('atr', 20)
}
# 安全获取趋势分析数据处理可能的NaN值
def safe_get_trend_value(key, default=0):
value = trend_analysis.get(key, default)
try:
if value is None:
return default
if isinstance(value, float) and value != value: # NaN检查
return default
return value
except (ValueError, TypeError):
return default
trend_data = {
'adx': safe_get_trend_value('adx'),
'trend_strength': safe_get_trend_value('trend_strength', 'none'),
'trend_direction': safe_get_trend_value('trend_direction', 'neutral'),
'ma_relationship': safe_get_trend_value('ma_relationship', 'neutral'),
'overall_trend': safe_get_trend_value('overall_trend', 'neutral'),
'win_rate': safe_round(win_rate, 1) if win_rate is not None else 0
}
# 安全获取止损和目标价格
safe_stop_loss = 0
try:
stop_loss = risk_manager.calculate_stop_loss(kline_data, current_price, "long")
safe_stop_loss = safe_round(stop_loss, 2) if stop_loss is not None else 0
except Exception:
safe_stop_loss = 0
safe_target_price = 0
try:
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
if resistance_levels:
target = resistance_levels[0]
if target is not None and (not isinstance(target, float) or target == target): # 不是NaN
safe_target_price = safe_round(target, 2)
else:
safe_target_price = safe_round(current_price * 1.05, 2)
else:
safe_target_price = safe_round(current_price * 1.05, 2)
except Exception:
safe_target_price = safe_round(current_price * 1.05, 2)
risk_metrics = {
'stop_loss': safe_stop_loss,
'target_price': safe_target_price,
'profit_loss_ratio': 1.8,
'position_size': 2,
'risk_ratio': 2.5
}
ai_analysis = deepseek_agent.analyze_market(market_data, technical_indicators, trend_data, risk_metrics)
recommendation = deepseek_agent.generate_trade_recommendation(ai_analysis)
# 构建卡片数据
# 获取中文名称
product_name_cn = data_fetcher.get_product_name_cn(symbol)
# 转换趋势方向为中文
def direction_to_cn(direction):
direction_map = {
'bullish': '多头',
'bearish': '空头',
'sideways': '震荡'
}
return direction_map.get(direction, direction)
card_data = {
'symbol': symbol,
'name': f"{product_name_cn}({symbol})".upper(),
'current_price': safe_round(current_price, 2),
'direction': direction_to_cn(trend_analysis.get('overall_trend', 'sideways')),
'win_rate': safe_round(win_rate, 1),
'recommendation': recommendation,
'technical_indicators': technical_indicators,
'fund_flow': fund_flow_analysis,
'rollover_warning': rollover_analysis
}
return jsonify(card_data)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/card/<symbol>')
def card(symbol):
"""分析卡片页面"""
try:
# 获取卡片数据
import requests
response = requests.get(f"http://localhost:5000/api/card/{symbol}")
card_data = response.json()
if 'error' in card_data:
return render_template('error.html', message=card_data['error'])
return render_template('card.html', **card_data)
except Exception as e:
print(f"获取卡片数据失败: {e}")
return render_template('error.html', message=f"获取卡片数据失败: {str(e)}")
def _get_trend_strength_display(adx):
"""获取趋势强度显示文本"""
try:
adx_value = float(adx) if adx is not None else 0
if adx_value > 40:
return f"强势多头({round(adx_value, 1)})"
elif adx_value > 25:
return f"多头({round(adx_value, 1)})"
elif adx_value > 20:
return f"弱势多头({round(adx_value, 1)})"
else:
return f"震荡({round(adx_value, 1)})"
except (ValueError, TypeError):
return "震荡(0.0)"
def _get_rollover_warning(rollover_analysis):
"""获取换月预警信息"""
try:
days = rollover_analysis.get('days_to_delivery', 0)
level = rollover_analysis.get('warning_level', 'low')
try:
days_value = int(days) if days is not None else 0
except (ValueError, TypeError):
days_value = 0
if level == 'critical':
return f"⚠️ {abs(days_value)}"
elif level == 'high':
return f"⚠️ {days_value}"
elif level == 'medium':
return f"⚠️ {days_value}"
else:
return f"{days_value}"
except (ValueError, TypeError):
return "✅ 0 天"
def _get_kline_data_for_chart(kline_data):
"""获取图表用的K线数据"""
try:
chart_data = []
if kline_data is not None and not kline_data.empty:
for idx, row in kline_data.tail(30).iterrows():
try:
chart_data.append({
'date': idx.strftime('%Y-%m-%d') if hasattr(idx, 'strftime') else '2023-01-01',
'open': float(row.get('open', 0)),
'high': float(row.get('high', 0)),
'low': float(row.get('low', 0)),
'close': float(row.get('close', 0)),
'volume': float(row.get('volume', 0))
})
except (ValueError, TypeError):
continue
return chart_data
except (ValueError, TypeError):
return []
@app.route('/api/selected/add/<symbol>', methods=['POST'])
def add_selected_symbol(symbol):
"""添加自选品种"""
global selected_symbols
if symbol not in selected_symbols:
selected_symbols.append(symbol)
return jsonify({'status': 'success', 'selected_symbols': selected_symbols})
@app.route('/api/selected/remove/<symbol>', methods=['POST'])
def remove_selected_symbol(symbol):
"""删除自选品种"""
global selected_symbols
if symbol in selected_symbols:
selected_symbols.remove(symbol)
return jsonify({'status': 'success', 'selected_symbols': selected_symbols})
@app.route('/api/selected/list')
def get_selected_symbols():
"""获取自选品种列表"""
return jsonify({'selected_symbols': selected_symbols})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)