# 压力支撑模块 import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional from qihuo_analyzer.utils.technical_analysis import calculate_bollinger_bands class SupportResistance: """压力支撑分析器""" def __init__(self): pass def analyze_support_resistance(self, data: pd.DataFrame) -> Dict: """分析压力支撑位""" result = {} # 识别关键价位 key_levels = self._identify_key_levels(data) result.update(key_levels) # 计算枢轴点 pivot_points = self._calculate_pivot_points(data) result.update(pivot_points) # 基于布林带的支撑阻力 bollinger_levels = self._calculate_bollinger_levels(data) result.update(bollinger_levels) # 最近高低点分析 recent_high_low = self._analyze_recent_high_low(data) result.update(recent_high_low) # 斐波那契回调线 fibonacci_levels = self._calculate_fibonacci_levels(data) result['fibonacci_levels'] = fibonacci_levels # 综合支撑阻力位 support_resistance_levels = self._generate_support_resistance_levels(result) result['support_resistance_levels'] = support_resistance_levels return result def _identify_key_levels(self, data: pd.DataFrame) -> Dict: """识别关键价位""" # 计算最近N天的高低点 recent_high = data['high'].tail(50).max() recent_low = data['low'].tail(50).min() # 计算最近N天的平均波幅 avg_range = (data['high'] - data['low']).tail(50).mean() # 识别成交量密集区 volume_profile = self._calculate_volume_profile(data) # 识别价格密集区 price_density = self._calculate_price_density(data) return { 'recent_high': recent_high, 'recent_low': recent_low, 'avg_range': avg_range, 'volume_profile': volume_profile, 'price_density': price_density } def _calculate_pivot_points(self, data: pd.DataFrame) -> Dict: """计算枢轴点""" # 使用最近的高点、低点和收盘价计算枢轴点 if len(data) < 2: return { 'pivot_point': None, 'resistance_1': None, 'resistance_2': None, 'support_1': None, 'support_2': None } high = data['high'].iloc[-1] low = data['low'].iloc[-1] close = data['close'].iloc[-1] # 计算枢轴点 pivot_point = (high + low + close) / 3 # 计算阻力位和支撑位 resistance_1 = 2 * pivot_point - low resistance_2 = pivot_point + (high - low) support_1 = 2 * pivot_point - high support_2 = pivot_point - (high - low) return { 'pivot_point': pivot_point, 'resistance_1': resistance_1, 'resistance_2': resistance_2, 'support_1': support_1, 'support_2': support_2 } def _calculate_bollinger_levels(self, data: pd.DataFrame) -> Dict: """基于布林带的支撑阻力""" # 计算布林带 bollinger_data = calculate_bollinger_bands(data) # 获取最新的布林带值 upper_band = bollinger_data['upper_band'].iloc[-1] middle_band = bollinger_data['sma'].iloc[-1] lower_band = bollinger_data['lower_band'].iloc[-1] return { 'bollinger_upper': upper_band, 'bollinger_middle': middle_band, 'bollinger_lower': lower_band } def _analyze_recent_high_low(self, data: pd.DataFrame) -> Dict: """最近高低点分析""" # 计算不同周期的高低点 periods = [10, 20, 50] high_low_levels = {} for period in periods: if len(data) >= period: high_low_levels[f'{period}d_high'] = data['high'].tail(period).max() high_low_levels[f'{period}d_low'] = data['low'].tail(period).min() else: high_low_levels[f'{period}d_high'] = None high_low_levels[f'{period}d_low'] = None return high_low_levels def _calculate_volume_profile(self, data: pd.DataFrame) -> Dict: """计算成交量分布""" # 简化的成交量分布分析 # 将价格区间分成10个区间,计算每个区间的成交量 if len(data) < 10: return {} price_min = data['low'].tail(50).min() price_max = data['high'].tail(50).max() price_range = price_max - price_min bin_size = price_range / 10 volume_profile = {} for i in range(10): bin_low = price_min + i * bin_size bin_high = price_min + (i + 1) * bin_size # 计算该价格区间的成交量 bin_volume = data[ (data['low'] <= bin_high) & (data['high'] >= bin_low) ]['volume'].sum() volume_profile[f'bin_{i+1}'] = { 'price_range': (bin_low, bin_high), 'volume': bin_volume } # 找出成交量最大的区间 max_volume_bin = max(volume_profile.items(), key=lambda x: x[1]['volume']) return { 'volume_profile': volume_profile, 'max_volume_bin': max_volume_bin } def _calculate_price_density(self, data: pd.DataFrame) -> Dict: """计算价格密度""" # 简化的价格密度分析 if len(data) < 10: return {} # 计算收盘价的分布 prices = data['close'].tail(100) price_std = prices.std() price_mean = prices.mean() # 计算价格分位数 price_percentiles = { 'p10': np.percentile(prices, 10), 'p25': np.percentile(prices, 25), 'p50': np.percentile(prices, 50), 'p75': np.percentile(prices, 75), 'p90': np.percentile(prices, 90) } return { 'price_mean': price_mean, 'price_std': price_std, 'price_percentiles': price_percentiles } def _calculate_fibonacci_levels(self, data: pd.DataFrame) -> Dict: """计算斐波那契回调线""" if len(data) < 20: return {} # 找出最近的显著高低点 swing_high = data['high'].tail(50).max() swing_low = data['low'].tail(50).min() # 计算斐波那契回调位 range_high_low = swing_high - swing_low fib_levels = { '0': swing_low, '0.236': swing_low + range_high_low * 0.236, '0.382': swing_low + range_high_low * 0.382, '0.5': swing_low + range_high_low * 0.5, '0.618': swing_low + range_high_low * 0.618, '0.786': swing_low + range_high_low * 0.786, '1': swing_high } return fib_levels def _generate_support_resistance_levels(self, analysis: Dict) -> Dict: """生成综合支撑阻力位""" # 收集所有可能的支撑阻力位 all_levels = [] # 添加最近高低点 all_levels.append(analysis.get('recent_high', 0)) all_levels.append(analysis.get('recent_low', 0)) # 添加枢轴点相关价位 all_levels.extend([ analysis.get('pivot_point', 0), analysis.get('resistance_1', 0), analysis.get('resistance_2', 0), analysis.get('support_1', 0), analysis.get('support_2', 0) ]) # 添加布林带相关价位 all_levels.extend([ analysis.get('bollinger_upper', 0), analysis.get('bollinger_middle', 0), analysis.get('bollinger_lower', 0) ]) # 添加不同周期的高低点 periods = [10, 20, 50] for period in periods: all_levels.append(analysis.get(f'{period}d_high', 0)) all_levels.append(analysis.get(f'{period}d_low', 0)) # 添加斐波那契回调位 fib_levels = analysis.get('fibonacci_levels', {}) all_levels.extend(fib_levels.values()) # 过滤无效值并排序 all_levels = [level for level in all_levels if level and level > 0] all_levels.sort() # 去重(相近的价位视为同一价位) if not all_levels: return {'support_levels': [], 'resistance_levels': []} unique_levels = [] threshold = analysis.get('avg_range', 10) * 0.3 # 阈值为平均波幅的30% for level in all_levels: if not unique_levels or abs(level - unique_levels[-1]) > threshold: unique_levels.append(level) # 确定当前价格 current_price = analysis.get('recent_high', 3500) * 0.95 # 使用最近高点的95%作为当前价格 # 分离支撑位和阻力位 support_levels = [level for level in unique_levels if level < current_price] resistance_levels = [level for level in unique_levels if level > current_price] # 按距离当前价格排序 support_levels.sort(reverse=True) # 最近的支撑位在前 resistance_levels.sort() # 最近的阻力位在前 # 取最近的几个支撑阻力位 support_levels = support_levels[:3] # 最近的3个支撑位 resistance_levels = resistance_levels[:3] # 最近的3个阻力位 return { 'support_levels': support_levels, 'resistance_levels': resistance_levels, 'current_price': current_price } def calculate_stop_loss_level(self, data: pd.DataFrame, direction: str, atr: float) -> float: """计算智能止损位""" # 分析支撑阻力位 sr_analysis = self.analyze_support_resistance(data) support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', []) resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', []) current_price = data['close'].iloc[-1] if direction == 'long': # 做多时,止损位应在最近的支撑位下方 if support_levels: # 最近的支撑位下方ATR的0.5倍 stop_loss = support_levels[0] - atr * 0.5 else: # 没有支撑位时,使用ATR的2倍 stop_loss = current_price - atr * 2 elif direction == 'short': # 做空时,止损位应在最近的阻力位上方 if resistance_levels: # 最近的阻力位上方ATR的0.5倍 stop_loss = resistance_levels[0] + atr * 0.5 else: # 没有阻力位时,使用ATR的2倍 stop_loss = current_price + atr * 2 else: raise ValueError("Direction must be 'long' or 'short'") return stop_loss def calculate_target_price(self, data: pd.DataFrame, direction: str, entry_price: float) -> float: """计算目标价""" # 分析支撑阻力位 sr_analysis = self.analyze_support_resistance(data) support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', []) resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', []) if direction == 'long': # 做多时,目标价应在最近的阻力位 if resistance_levels: target_price = resistance_levels[0] else: # 没有阻力位时,使用近期高点 target_price = sr_analysis.get('recent_high', entry_price * 1.05) elif direction == 'short': # 做空时,目标价应在最近的支撑位 if support_levels: target_price = support_levels[0] else: # 没有支撑位时,使用近期低点 target_price = sr_analysis.get('recent_low', entry_price * 0.95) else: raise ValueError("Direction must be 'long' or 'short'") return target_price def analyze_price_position(self, data: pd.DataFrame) -> Dict: """分析价格位置""" current_price = data['close'].iloc[-1] # 分析支撑阻力位 sr_analysis = self.analyze_support_resistance(data) support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', []) resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', []) # 计算价格与支撑阻力位的距离 distance_to_support = float('inf') distance_to_resistance = float('inf') if support_levels: distance_to_support = current_price - support_levels[0] if resistance_levels: distance_to_resistance = resistance_levels[0] - current_price # 分析价格位置 position = 'neutral' if distance_to_resistance < sr_analysis.get('avg_range', 10) * 0.2: position = 'near_resistance' elif distance_to_support < sr_analysis.get('avg_range', 10) * 0.2: position = 'near_support' # 分析价格在布林带中的位置 bollinger_upper = sr_analysis.get('bollinger_upper', 0) bollinger_middle = sr_analysis.get('bollinger_middle', 0) bollinger_lower = sr_analysis.get('bollinger_lower', 0) bollinger_position = 'middle' if current_price > bollinger_upper: bollinger_position = 'upper' elif current_price < bollinger_lower: bollinger_position = 'lower' return { 'current_price': current_price, 'distance_to_support': distance_to_support, 'distance_to_resistance': distance_to_resistance, 'position': position, 'bollinger_position': bollinger_position, 'support_levels': support_levels, 'resistance_levels': resistance_levels }