|
|
# 压力支撑模块
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
from qihuo_analyzer.utils.technical_analysis import calculate_bollinger_bands
|
|
|
|
|
|
|
|
|
class SupportResistance:
|
|
|
"""压力支撑分析器"""
|
|
|
|
|
|
def __init__(self):
|
|
|
pass
|
|
|
|
|
|
def analyze_support_resistance(self, data: pd.DataFrame) -> Dict:
|
|
|
"""分析压力支撑位"""
|
|
|
result = {}
|
|
|
|
|
|
# 识别关键价位
|
|
|
key_levels = self._identify_key_levels(data)
|
|
|
result.update(key_levels)
|
|
|
|
|
|
# 计算枢轴点
|
|
|
pivot_points = self._calculate_pivot_points(data)
|
|
|
result.update(pivot_points)
|
|
|
|
|
|
# 基于布林带的支撑阻力
|
|
|
bollinger_levels = self._calculate_bollinger_levels(data)
|
|
|
result.update(bollinger_levels)
|
|
|
|
|
|
# 最近高低点分析
|
|
|
recent_high_low = self._analyze_recent_high_low(data)
|
|
|
result.update(recent_high_low)
|
|
|
|
|
|
# 斐波那契回调线
|
|
|
fibonacci_levels = self._calculate_fibonacci_levels(data)
|
|
|
result['fibonacci_levels'] = fibonacci_levels
|
|
|
|
|
|
# 综合支撑阻力位
|
|
|
support_resistance_levels = self._generate_support_resistance_levels(result)
|
|
|
result['support_resistance_levels'] = support_resistance_levels
|
|
|
|
|
|
return result
|
|
|
|
|
|
def _identify_key_levels(self, data: pd.DataFrame) -> Dict:
|
|
|
"""识别关键价位"""
|
|
|
# 计算最近N天的高低点
|
|
|
recent_high = data['high'].tail(50).max()
|
|
|
recent_low = data['low'].tail(50).min()
|
|
|
|
|
|
# 计算最近N天的平均波幅
|
|
|
avg_range = (data['high'] - data['low']).tail(50).mean()
|
|
|
|
|
|
# 识别成交量密集区
|
|
|
volume_profile = self._calculate_volume_profile(data)
|
|
|
|
|
|
# 识别价格密集区
|
|
|
price_density = self._calculate_price_density(data)
|
|
|
|
|
|
return {
|
|
|
'recent_high': recent_high,
|
|
|
'recent_low': recent_low,
|
|
|
'avg_range': avg_range,
|
|
|
'volume_profile': volume_profile,
|
|
|
'price_density': price_density
|
|
|
}
|
|
|
|
|
|
def _calculate_pivot_points(self, data: pd.DataFrame) -> Dict:
|
|
|
"""计算枢轴点"""
|
|
|
# 使用最近的高点、低点和收盘价计算枢轴点
|
|
|
if len(data) < 2:
|
|
|
return {
|
|
|
'pivot_point': None,
|
|
|
'resistance_1': None,
|
|
|
'resistance_2': None,
|
|
|
'support_1': None,
|
|
|
'support_2': None
|
|
|
}
|
|
|
|
|
|
high = data['high'].iloc[-1]
|
|
|
low = data['low'].iloc[-1]
|
|
|
close = data['close'].iloc[-1]
|
|
|
|
|
|
# 计算枢轴点
|
|
|
pivot_point = (high + low + close) / 3
|
|
|
|
|
|
# 计算阻力位和支撑位
|
|
|
resistance_1 = 2 * pivot_point - low
|
|
|
resistance_2 = pivot_point + (high - low)
|
|
|
support_1 = 2 * pivot_point - high
|
|
|
support_2 = pivot_point - (high - low)
|
|
|
|
|
|
return {
|
|
|
'pivot_point': pivot_point,
|
|
|
'resistance_1': resistance_1,
|
|
|
'resistance_2': resistance_2,
|
|
|
'support_1': support_1,
|
|
|
'support_2': support_2
|
|
|
}
|
|
|
|
|
|
def _calculate_bollinger_levels(self, data: pd.DataFrame) -> Dict:
|
|
|
"""基于布林带的支撑阻力"""
|
|
|
# 计算布林带
|
|
|
bollinger_data = calculate_bollinger_bands(data)
|
|
|
|
|
|
# 获取最新的布林带值
|
|
|
upper_band = bollinger_data['upper_band'].iloc[-1]
|
|
|
middle_band = bollinger_data['sma'].iloc[-1]
|
|
|
lower_band = bollinger_data['lower_band'].iloc[-1]
|
|
|
|
|
|
return {
|
|
|
'bollinger_upper': upper_band,
|
|
|
'bollinger_middle': middle_band,
|
|
|
'bollinger_lower': lower_band
|
|
|
}
|
|
|
|
|
|
def _analyze_recent_high_low(self, data: pd.DataFrame) -> Dict:
|
|
|
"""最近高低点分析"""
|
|
|
# 计算不同周期的高低点
|
|
|
periods = [10, 20, 50]
|
|
|
high_low_levels = {}
|
|
|
|
|
|
for period in periods:
|
|
|
if len(data) >= period:
|
|
|
high_low_levels[f'{period}d_high'] = data['high'].tail(period).max()
|
|
|
high_low_levels[f'{period}d_low'] = data['low'].tail(period).min()
|
|
|
else:
|
|
|
high_low_levels[f'{period}d_high'] = None
|
|
|
high_low_levels[f'{period}d_low'] = None
|
|
|
|
|
|
return high_low_levels
|
|
|
|
|
|
def _calculate_volume_profile(self, data: pd.DataFrame) -> Dict:
|
|
|
"""计算成交量分布"""
|
|
|
# 简化的成交量分布分析
|
|
|
# 将价格区间分成10个区间,计算每个区间的成交量
|
|
|
if len(data) < 10:
|
|
|
return {}
|
|
|
|
|
|
price_min = data['low'].tail(50).min()
|
|
|
price_max = data['high'].tail(50).max()
|
|
|
price_range = price_max - price_min
|
|
|
bin_size = price_range / 10
|
|
|
|
|
|
volume_profile = {}
|
|
|
for i in range(10):
|
|
|
bin_low = price_min + i * bin_size
|
|
|
bin_high = price_min + (i + 1) * bin_size
|
|
|
|
|
|
# 计算该价格区间的成交量
|
|
|
bin_volume = data[
|
|
|
(data['low'] <= bin_high) &
|
|
|
(data['high'] >= bin_low)
|
|
|
]['volume'].sum()
|
|
|
|
|
|
volume_profile[f'bin_{i+1}'] = {
|
|
|
'price_range': (bin_low, bin_high),
|
|
|
'volume': bin_volume
|
|
|
}
|
|
|
|
|
|
# 找出成交量最大的区间
|
|
|
max_volume_bin = max(volume_profile.items(), key=lambda x: x[1]['volume'])
|
|
|
|
|
|
return {
|
|
|
'volume_profile': volume_profile,
|
|
|
'max_volume_bin': max_volume_bin
|
|
|
}
|
|
|
|
|
|
def _calculate_price_density(self, data: pd.DataFrame) -> Dict:
|
|
|
"""计算价格密度"""
|
|
|
# 简化的价格密度分析
|
|
|
if len(data) < 10:
|
|
|
return {}
|
|
|
|
|
|
# 计算收盘价的分布
|
|
|
prices = data['close'].tail(100)
|
|
|
price_std = prices.std()
|
|
|
price_mean = prices.mean()
|
|
|
|
|
|
# 计算价格分位数
|
|
|
price_percentiles = {
|
|
|
'p10': np.percentile(prices, 10),
|
|
|
'p25': np.percentile(prices, 25),
|
|
|
'p50': np.percentile(prices, 50),
|
|
|
'p75': np.percentile(prices, 75),
|
|
|
'p90': np.percentile(prices, 90)
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
'price_mean': price_mean,
|
|
|
'price_std': price_std,
|
|
|
'price_percentiles': price_percentiles
|
|
|
}
|
|
|
|
|
|
def _calculate_fibonacci_levels(self, data: pd.DataFrame) -> Dict:
|
|
|
"""计算斐波那契回调线"""
|
|
|
if len(data) < 20:
|
|
|
return {}
|
|
|
|
|
|
# 找出最近的显著高低点
|
|
|
swing_high = data['high'].tail(50).max()
|
|
|
swing_low = data['low'].tail(50).min()
|
|
|
|
|
|
# 计算斐波那契回调位
|
|
|
range_high_low = swing_high - swing_low
|
|
|
|
|
|
fib_levels = {
|
|
|
'0': swing_low,
|
|
|
'0.236': swing_low + range_high_low * 0.236,
|
|
|
'0.382': swing_low + range_high_low * 0.382,
|
|
|
'0.5': swing_low + range_high_low * 0.5,
|
|
|
'0.618': swing_low + range_high_low * 0.618,
|
|
|
'0.786': swing_low + range_high_low * 0.786,
|
|
|
'1': swing_high
|
|
|
}
|
|
|
|
|
|
return fib_levels
|
|
|
|
|
|
def _generate_support_resistance_levels(self, analysis: Dict) -> Dict:
|
|
|
"""生成综合支撑阻力位"""
|
|
|
# 收集所有可能的支撑阻力位
|
|
|
all_levels = []
|
|
|
|
|
|
# 添加最近高低点
|
|
|
all_levels.append(analysis.get('recent_high', 0))
|
|
|
all_levels.append(analysis.get('recent_low', 0))
|
|
|
|
|
|
# 添加枢轴点相关价位
|
|
|
all_levels.extend([
|
|
|
analysis.get('pivot_point', 0),
|
|
|
analysis.get('resistance_1', 0),
|
|
|
analysis.get('resistance_2', 0),
|
|
|
analysis.get('support_1', 0),
|
|
|
analysis.get('support_2', 0)
|
|
|
])
|
|
|
|
|
|
# 添加布林带相关价位
|
|
|
all_levels.extend([
|
|
|
analysis.get('bollinger_upper', 0),
|
|
|
analysis.get('bollinger_middle', 0),
|
|
|
analysis.get('bollinger_lower', 0)
|
|
|
])
|
|
|
|
|
|
# 添加不同周期的高低点
|
|
|
periods = [10, 20, 50]
|
|
|
for period in periods:
|
|
|
all_levels.append(analysis.get(f'{period}d_high', 0))
|
|
|
all_levels.append(analysis.get(f'{period}d_low', 0))
|
|
|
|
|
|
# 添加斐波那契回调位
|
|
|
fib_levels = analysis.get('fibonacci_levels', {})
|
|
|
all_levels.extend(fib_levels.values())
|
|
|
|
|
|
# 过滤无效值并排序
|
|
|
all_levels = [level for level in all_levels if level and level > 0]
|
|
|
all_levels.sort()
|
|
|
|
|
|
# 去重(相近的价位视为同一价位)
|
|
|
if not all_levels:
|
|
|
return {'support_levels': [], 'resistance_levels': []}
|
|
|
|
|
|
unique_levels = []
|
|
|
threshold = analysis.get('avg_range', 10) * 0.3 # 阈值为平均波幅的30%
|
|
|
|
|
|
for level in all_levels:
|
|
|
if not unique_levels or abs(level - unique_levels[-1]) > threshold:
|
|
|
unique_levels.append(level)
|
|
|
|
|
|
# 确定当前价格
|
|
|
current_price = analysis.get('recent_high', 3500) * 0.95 # 使用最近高点的95%作为当前价格
|
|
|
|
|
|
# 分离支撑位和阻力位
|
|
|
support_levels = [level for level in unique_levels if level < current_price]
|
|
|
resistance_levels = [level for level in unique_levels if level > current_price]
|
|
|
|
|
|
# 按距离当前价格排序
|
|
|
support_levels.sort(reverse=True) # 最近的支撑位在前
|
|
|
resistance_levels.sort() # 最近的阻力位在前
|
|
|
|
|
|
# 取最近的几个支撑阻力位
|
|
|
support_levels = support_levels[:3] # 最近的3个支撑位
|
|
|
resistance_levels = resistance_levels[:3] # 最近的3个阻力位
|
|
|
|
|
|
return {
|
|
|
'support_levels': support_levels,
|
|
|
'resistance_levels': resistance_levels,
|
|
|
'current_price': current_price
|
|
|
}
|
|
|
|
|
|
def calculate_stop_loss_level(self, data: pd.DataFrame, direction: str, atr: float) -> float:
|
|
|
"""计算智能止损位"""
|
|
|
# 分析支撑阻力位
|
|
|
sr_analysis = self.analyze_support_resistance(data)
|
|
|
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
|
|
|
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
|
|
|
current_price = data['close'].iloc[-1]
|
|
|
|
|
|
if direction == 'long':
|
|
|
# 做多时,止损位应在最近的支撑位下方
|
|
|
if support_levels:
|
|
|
# 最近的支撑位下方ATR的0.5倍
|
|
|
stop_loss = support_levels[0] - atr * 0.5
|
|
|
else:
|
|
|
# 没有支撑位时,使用ATR的2倍
|
|
|
stop_loss = current_price - atr * 2
|
|
|
elif direction == 'short':
|
|
|
# 做空时,止损位应在最近的阻力位上方
|
|
|
if resistance_levels:
|
|
|
# 最近的阻力位上方ATR的0.5倍
|
|
|
stop_loss = resistance_levels[0] + atr * 0.5
|
|
|
else:
|
|
|
# 没有阻力位时,使用ATR的2倍
|
|
|
stop_loss = current_price + atr * 2
|
|
|
else:
|
|
|
raise ValueError("Direction must be 'long' or 'short'")
|
|
|
|
|
|
return stop_loss
|
|
|
|
|
|
def calculate_target_price(self, data: pd.DataFrame, direction: str, entry_price: float) -> float:
|
|
|
"""计算目标价"""
|
|
|
# 分析支撑阻力位
|
|
|
sr_analysis = self.analyze_support_resistance(data)
|
|
|
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
|
|
|
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
|
|
|
|
|
|
if direction == 'long':
|
|
|
# 做多时,目标价应在最近的阻力位
|
|
|
if resistance_levels:
|
|
|
target_price = resistance_levels[0]
|
|
|
else:
|
|
|
# 没有阻力位时,使用近期高点
|
|
|
target_price = sr_analysis.get('recent_high', entry_price * 1.05)
|
|
|
elif direction == 'short':
|
|
|
# 做空时,目标价应在最近的支撑位
|
|
|
if support_levels:
|
|
|
target_price = support_levels[0]
|
|
|
else:
|
|
|
# 没有支撑位时,使用近期低点
|
|
|
target_price = sr_analysis.get('recent_low', entry_price * 0.95)
|
|
|
else:
|
|
|
raise ValueError("Direction must be 'long' or 'short'")
|
|
|
|
|
|
return target_price
|
|
|
|
|
|
def analyze_price_position(self, data: pd.DataFrame) -> Dict:
|
|
|
"""分析价格位置"""
|
|
|
current_price = data['close'].iloc[-1]
|
|
|
|
|
|
# 分析支撑阻力位
|
|
|
sr_analysis = self.analyze_support_resistance(data)
|
|
|
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
|
|
|
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
|
|
|
|
|
|
# 计算价格与支撑阻力位的距离
|
|
|
distance_to_support = float('inf')
|
|
|
distance_to_resistance = float('inf')
|
|
|
|
|
|
if support_levels:
|
|
|
distance_to_support = current_price - support_levels[0]
|
|
|
|
|
|
if resistance_levels:
|
|
|
distance_to_resistance = resistance_levels[0] - current_price
|
|
|
|
|
|
# 分析价格位置
|
|
|
position = 'neutral'
|
|
|
if distance_to_resistance < sr_analysis.get('avg_range', 10) * 0.2:
|
|
|
position = 'near_resistance'
|
|
|
elif distance_to_support < sr_analysis.get('avg_range', 10) * 0.2:
|
|
|
position = 'near_support'
|
|
|
|
|
|
# 分析价格在布林带中的位置
|
|
|
bollinger_upper = sr_analysis.get('bollinger_upper', 0)
|
|
|
bollinger_middle = sr_analysis.get('bollinger_middle', 0)
|
|
|
bollinger_lower = sr_analysis.get('bollinger_lower', 0)
|
|
|
|
|
|
bollinger_position = 'middle'
|
|
|
if current_price > bollinger_upper:
|
|
|
bollinger_position = 'upper'
|
|
|
elif current_price < bollinger_lower:
|
|
|
bollinger_position = 'lower'
|
|
|
|
|
|
return {
|
|
|
'current_price': current_price,
|
|
|
'distance_to_support': distance_to_support,
|
|
|
'distance_to_resistance': distance_to_resistance,
|
|
|
'position': position,
|
|
|
'bollinger_position': bollinger_position,
|
|
|
'support_levels': support_levels,
|
|
|
'resistance_levels': resistance_levels
|
|
|
}
|