You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

390 lines
14 KiB

# 压力支撑模块
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from qihuo_analyzer.utils.technical_analysis import calculate_bollinger_bands
class SupportResistance:
"""压力支撑分析器"""
def __init__(self):
pass
def analyze_support_resistance(self, data: pd.DataFrame) -> Dict:
"""分析压力支撑位"""
result = {}
# 识别关键价位
key_levels = self._identify_key_levels(data)
result.update(key_levels)
# 计算枢轴点
pivot_points = self._calculate_pivot_points(data)
result.update(pivot_points)
# 基于布林带的支撑阻力
bollinger_levels = self._calculate_bollinger_levels(data)
result.update(bollinger_levels)
# 最近高低点分析
recent_high_low = self._analyze_recent_high_low(data)
result.update(recent_high_low)
# 斐波那契回调线
fibonacci_levels = self._calculate_fibonacci_levels(data)
result['fibonacci_levels'] = fibonacci_levels
# 综合支撑阻力位
support_resistance_levels = self._generate_support_resistance_levels(result)
result['support_resistance_levels'] = support_resistance_levels
return result
def _identify_key_levels(self, data: pd.DataFrame) -> Dict:
"""识别关键价位"""
# 计算最近N天的高低点
recent_high = data['high'].tail(50).max()
recent_low = data['low'].tail(50).min()
# 计算最近N天的平均波幅
avg_range = (data['high'] - data['low']).tail(50).mean()
# 识别成交量密集区
volume_profile = self._calculate_volume_profile(data)
# 识别价格密集区
price_density = self._calculate_price_density(data)
return {
'recent_high': recent_high,
'recent_low': recent_low,
'avg_range': avg_range,
'volume_profile': volume_profile,
'price_density': price_density
}
def _calculate_pivot_points(self, data: pd.DataFrame) -> Dict:
"""计算枢轴点"""
# 使用最近的高点、低点和收盘价计算枢轴点
if len(data) < 2:
return {
'pivot_point': None,
'resistance_1': None,
'resistance_2': None,
'support_1': None,
'support_2': None
}
high = data['high'].iloc[-1]
low = data['low'].iloc[-1]
close = data['close'].iloc[-1]
# 计算枢轴点
pivot_point = (high + low + close) / 3
# 计算阻力位和支撑位
resistance_1 = 2 * pivot_point - low
resistance_2 = pivot_point + (high - low)
support_1 = 2 * pivot_point - high
support_2 = pivot_point - (high - low)
return {
'pivot_point': pivot_point,
'resistance_1': resistance_1,
'resistance_2': resistance_2,
'support_1': support_1,
'support_2': support_2
}
def _calculate_bollinger_levels(self, data: pd.DataFrame) -> Dict:
"""基于布林带的支撑阻力"""
# 计算布林带
bollinger_data = calculate_bollinger_bands(data)
# 获取最新的布林带值
upper_band = bollinger_data['upper_band'].iloc[-1]
middle_band = bollinger_data['sma'].iloc[-1]
lower_band = bollinger_data['lower_band'].iloc[-1]
return {
'bollinger_upper': upper_band,
'bollinger_middle': middle_band,
'bollinger_lower': lower_band
}
def _analyze_recent_high_low(self, data: pd.DataFrame) -> Dict:
"""最近高低点分析"""
# 计算不同周期的高低点
periods = [10, 20, 50]
high_low_levels = {}
for period in periods:
if len(data) >= period:
high_low_levels[f'{period}d_high'] = data['high'].tail(period).max()
high_low_levels[f'{period}d_low'] = data['low'].tail(period).min()
else:
high_low_levels[f'{period}d_high'] = None
high_low_levels[f'{period}d_low'] = None
return high_low_levels
def _calculate_volume_profile(self, data: pd.DataFrame) -> Dict:
"""计算成交量分布"""
# 简化的成交量分布分析
# 将价格区间分成10个区间计算每个区间的成交量
if len(data) < 10:
return {}
price_min = data['low'].tail(50).min()
price_max = data['high'].tail(50).max()
price_range = price_max - price_min
bin_size = price_range / 10
volume_profile = {}
for i in range(10):
bin_low = price_min + i * bin_size
bin_high = price_min + (i + 1) * bin_size
# 计算该价格区间的成交量
bin_volume = data[
(data['low'] <= bin_high) &
(data['high'] >= bin_low)
]['volume'].sum()
volume_profile[f'bin_{i+1}'] = {
'price_range': (bin_low, bin_high),
'volume': bin_volume
}
# 找出成交量最大的区间
max_volume_bin = max(volume_profile.items(), key=lambda x: x[1]['volume'])
return {
'volume_profile': volume_profile,
'max_volume_bin': max_volume_bin
}
def _calculate_price_density(self, data: pd.DataFrame) -> Dict:
"""计算价格密度"""
# 简化的价格密度分析
if len(data) < 10:
return {}
# 计算收盘价的分布
prices = data['close'].tail(100)
price_std = prices.std()
price_mean = prices.mean()
# 计算价格分位数
price_percentiles = {
'p10': np.percentile(prices, 10),
'p25': np.percentile(prices, 25),
'p50': np.percentile(prices, 50),
'p75': np.percentile(prices, 75),
'p90': np.percentile(prices, 90)
}
return {
'price_mean': price_mean,
'price_std': price_std,
'price_percentiles': price_percentiles
}
def _calculate_fibonacci_levels(self, data: pd.DataFrame) -> Dict:
"""计算斐波那契回调线"""
if len(data) < 20:
return {}
# 找出最近的显著高低点
swing_high = data['high'].tail(50).max()
swing_low = data['low'].tail(50).min()
# 计算斐波那契回调位
range_high_low = swing_high - swing_low
fib_levels = {
'0': swing_low,
'0.236': swing_low + range_high_low * 0.236,
'0.382': swing_low + range_high_low * 0.382,
'0.5': swing_low + range_high_low * 0.5,
'0.618': swing_low + range_high_low * 0.618,
'0.786': swing_low + range_high_low * 0.786,
'1': swing_high
}
return fib_levels
def _generate_support_resistance_levels(self, analysis: Dict) -> Dict:
"""生成综合支撑阻力位"""
# 收集所有可能的支撑阻力位
all_levels = []
# 添加最近高低点
all_levels.append(analysis.get('recent_high', 0))
all_levels.append(analysis.get('recent_low', 0))
# 添加枢轴点相关价位
all_levels.extend([
analysis.get('pivot_point', 0),
analysis.get('resistance_1', 0),
analysis.get('resistance_2', 0),
analysis.get('support_1', 0),
analysis.get('support_2', 0)
])
# 添加布林带相关价位
all_levels.extend([
analysis.get('bollinger_upper', 0),
analysis.get('bollinger_middle', 0),
analysis.get('bollinger_lower', 0)
])
# 添加不同周期的高低点
periods = [10, 20, 50]
for period in periods:
all_levels.append(analysis.get(f'{period}d_high', 0))
all_levels.append(analysis.get(f'{period}d_low', 0))
# 添加斐波那契回调位
fib_levels = analysis.get('fibonacci_levels', {})
all_levels.extend(fib_levels.values())
# 过滤无效值并排序
all_levels = [level for level in all_levels if level and level > 0]
all_levels.sort()
# 去重(相近的价位视为同一价位)
if not all_levels:
return {'support_levels': [], 'resistance_levels': []}
unique_levels = []
threshold = analysis.get('avg_range', 10) * 0.3 # 阈值为平均波幅的30%
for level in all_levels:
if not unique_levels or abs(level - unique_levels[-1]) > threshold:
unique_levels.append(level)
# 确定当前价格
current_price = analysis.get('recent_high', 3500) * 0.95 # 使用最近高点的95%作为当前价格
# 分离支撑位和阻力位
support_levels = [level for level in unique_levels if level < current_price]
resistance_levels = [level for level in unique_levels if level > current_price]
# 按距离当前价格排序
support_levels.sort(reverse=True) # 最近的支撑位在前
resistance_levels.sort() # 最近的阻力位在前
# 取最近的几个支撑阻力位
support_levels = support_levels[:3] # 最近的3个支撑位
resistance_levels = resistance_levels[:3] # 最近的3个阻力位
return {
'support_levels': support_levels,
'resistance_levels': resistance_levels,
'current_price': current_price
}
def calculate_stop_loss_level(self, data: pd.DataFrame, direction: str, atr: float) -> float:
"""计算智能止损位"""
# 分析支撑阻力位
sr_analysis = self.analyze_support_resistance(data)
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
current_price = data['close'].iloc[-1]
if direction == 'long':
# 做多时,止损位应在最近的支撑位下方
if support_levels:
# 最近的支撑位下方ATR的0.5倍
stop_loss = support_levels[0] - atr * 0.5
else:
# 没有支撑位时使用ATR的2倍
stop_loss = current_price - atr * 2
elif direction == 'short':
# 做空时,止损位应在最近的阻力位上方
if resistance_levels:
# 最近的阻力位上方ATR的0.5倍
stop_loss = resistance_levels[0] + atr * 0.5
else:
# 没有阻力位时使用ATR的2倍
stop_loss = current_price + atr * 2
else:
raise ValueError("Direction must be 'long' or 'short'")
return stop_loss
def calculate_target_price(self, data: pd.DataFrame, direction: str, entry_price: float) -> float:
"""计算目标价"""
# 分析支撑阻力位
sr_analysis = self.analyze_support_resistance(data)
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
if direction == 'long':
# 做多时,目标价应在最近的阻力位
if resistance_levels:
target_price = resistance_levels[0]
else:
# 没有阻力位时,使用近期高点
target_price = sr_analysis.get('recent_high', entry_price * 1.05)
elif direction == 'short':
# 做空时,目标价应在最近的支撑位
if support_levels:
target_price = support_levels[0]
else:
# 没有支撑位时,使用近期低点
target_price = sr_analysis.get('recent_low', entry_price * 0.95)
else:
raise ValueError("Direction must be 'long' or 'short'")
return target_price
def analyze_price_position(self, data: pd.DataFrame) -> Dict:
"""分析价格位置"""
current_price = data['close'].iloc[-1]
# 分析支撑阻力位
sr_analysis = self.analyze_support_resistance(data)
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
# 计算价格与支撑阻力位的距离
distance_to_support = float('inf')
distance_to_resistance = float('inf')
if support_levels:
distance_to_support = current_price - support_levels[0]
if resistance_levels:
distance_to_resistance = resistance_levels[0] - current_price
# 分析价格位置
position = 'neutral'
if distance_to_resistance < sr_analysis.get('avg_range', 10) * 0.2:
position = 'near_resistance'
elif distance_to_support < sr_analysis.get('avg_range', 10) * 0.2:
position = 'near_support'
# 分析价格在布林带中的位置
bollinger_upper = sr_analysis.get('bollinger_upper', 0)
bollinger_middle = sr_analysis.get('bollinger_middle', 0)
bollinger_lower = sr_analysis.get('bollinger_lower', 0)
bollinger_position = 'middle'
if current_price > bollinger_upper:
bollinger_position = 'upper'
elif current_price < bollinger_lower:
bollinger_position = 'lower'
return {
'current_price': current_price,
'distance_to_support': distance_to_support,
'distance_to_resistance': distance_to_resistance,
'position': position,
'bollinger_position': bollinger_position,
'support_levels': support_levels,
'resistance_levels': resistance_levels
}