You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

390 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 压力支撑模块
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from qihuo_analyzer.utils.technical_analysis import calculate_bollinger_bands
class SupportResistance:
"""压力支撑分析器"""
def __init__(self):
pass
def analyze_support_resistance(self, data: pd.DataFrame) -> Dict:
"""分析压力支撑位"""
result = {}
# 识别关键价位
key_levels = self._identify_key_levels(data)
result.update(key_levels)
# 计算枢轴点
pivot_points = self._calculate_pivot_points(data)
result.update(pivot_points)
# 基于布林带的支撑阻力
bollinger_levels = self._calculate_bollinger_levels(data)
result.update(bollinger_levels)
# 最近高低点分析
recent_high_low = self._analyze_recent_high_low(data)
result.update(recent_high_low)
# 斐波那契回调线
fibonacci_levels = self._calculate_fibonacci_levels(data)
result['fibonacci_levels'] = fibonacci_levels
# 综合支撑阻力位
support_resistance_levels = self._generate_support_resistance_levels(result)
result['support_resistance_levels'] = support_resistance_levels
return result
def _identify_key_levels(self, data: pd.DataFrame) -> Dict:
"""识别关键价位"""
# 计算最近N天的高低点
recent_high = data['high'].tail(50).max()
recent_low = data['low'].tail(50).min()
# 计算最近N天的平均波幅
avg_range = (data['high'] - data['low']).tail(50).mean()
# 识别成交量密集区
volume_profile = self._calculate_volume_profile(data)
# 识别价格密集区
price_density = self._calculate_price_density(data)
return {
'recent_high': recent_high,
'recent_low': recent_low,
'avg_range': avg_range,
'volume_profile': volume_profile,
'price_density': price_density
}
def _calculate_pivot_points(self, data: pd.DataFrame) -> Dict:
"""计算枢轴点"""
# 使用最近的高点、低点和收盘价计算枢轴点
if len(data) < 2:
return {
'pivot_point': None,
'resistance_1': None,
'resistance_2': None,
'support_1': None,
'support_2': None
}
high = data['high'].iloc[-1]
low = data['low'].iloc[-1]
close = data['close'].iloc[-1]
# 计算枢轴点
pivot_point = (high + low + close) / 3
# 计算阻力位和支撑位
resistance_1 = 2 * pivot_point - low
resistance_2 = pivot_point + (high - low)
support_1 = 2 * pivot_point - high
support_2 = pivot_point - (high - low)
return {
'pivot_point': pivot_point,
'resistance_1': resistance_1,
'resistance_2': resistance_2,
'support_1': support_1,
'support_2': support_2
}
def _calculate_bollinger_levels(self, data: pd.DataFrame) -> Dict:
"""基于布林带的支撑阻力"""
# 计算布林带
bollinger_data = calculate_bollinger_bands(data)
# 获取最新的布林带值
upper_band = bollinger_data['upper_band'].iloc[-1]
middle_band = bollinger_data['sma'].iloc[-1]
lower_band = bollinger_data['lower_band'].iloc[-1]
return {
'bollinger_upper': upper_band,
'bollinger_middle': middle_band,
'bollinger_lower': lower_band
}
def _analyze_recent_high_low(self, data: pd.DataFrame) -> Dict:
"""最近高低点分析"""
# 计算不同周期的高低点
periods = [10, 20, 50]
high_low_levels = {}
for period in periods:
if len(data) >= period:
high_low_levels[f'{period}d_high'] = data['high'].tail(period).max()
high_low_levels[f'{period}d_low'] = data['low'].tail(period).min()
else:
high_low_levels[f'{period}d_high'] = None
high_low_levels[f'{period}d_low'] = None
return high_low_levels
def _calculate_volume_profile(self, data: pd.DataFrame) -> Dict:
"""计算成交量分布"""
# 简化的成交量分布分析
# 将价格区间分成10个区间计算每个区间的成交量
if len(data) < 10:
return {}
price_min = data['low'].tail(50).min()
price_max = data['high'].tail(50).max()
price_range = price_max - price_min
bin_size = price_range / 10
volume_profile = {}
for i in range(10):
bin_low = price_min + i * bin_size
bin_high = price_min + (i + 1) * bin_size
# 计算该价格区间的成交量
bin_volume = data[
(data['low'] <= bin_high) &
(data['high'] >= bin_low)
]['volume'].sum()
volume_profile[f'bin_{i+1}'] = {
'price_range': (bin_low, bin_high),
'volume': bin_volume
}
# 找出成交量最大的区间
max_volume_bin = max(volume_profile.items(), key=lambda x: x[1]['volume'])
return {
'volume_profile': volume_profile,
'max_volume_bin': max_volume_bin
}
def _calculate_price_density(self, data: pd.DataFrame) -> Dict:
"""计算价格密度"""
# 简化的价格密度分析
if len(data) < 10:
return {}
# 计算收盘价的分布
prices = data['close'].tail(100)
price_std = prices.std()
price_mean = prices.mean()
# 计算价格分位数
price_percentiles = {
'p10': np.percentile(prices, 10),
'p25': np.percentile(prices, 25),
'p50': np.percentile(prices, 50),
'p75': np.percentile(prices, 75),
'p90': np.percentile(prices, 90)
}
return {
'price_mean': price_mean,
'price_std': price_std,
'price_percentiles': price_percentiles
}
def _calculate_fibonacci_levels(self, data: pd.DataFrame) -> Dict:
"""计算斐波那契回调线"""
if len(data) < 20:
return {}
# 找出最近的显著高低点
swing_high = data['high'].tail(50).max()
swing_low = data['low'].tail(50).min()
# 计算斐波那契回调位
range_high_low = swing_high - swing_low
fib_levels = {
'0': swing_low,
'0.236': swing_low + range_high_low * 0.236,
'0.382': swing_low + range_high_low * 0.382,
'0.5': swing_low + range_high_low * 0.5,
'0.618': swing_low + range_high_low * 0.618,
'0.786': swing_low + range_high_low * 0.786,
'1': swing_high
}
return fib_levels
def _generate_support_resistance_levels(self, analysis: Dict) -> Dict:
"""生成综合支撑阻力位"""
# 收集所有可能的支撑阻力位
all_levels = []
# 添加最近高低点
all_levels.append(analysis.get('recent_high', 0))
all_levels.append(analysis.get('recent_low', 0))
# 添加枢轴点相关价位
all_levels.extend([
analysis.get('pivot_point', 0),
analysis.get('resistance_1', 0),
analysis.get('resistance_2', 0),
analysis.get('support_1', 0),
analysis.get('support_2', 0)
])
# 添加布林带相关价位
all_levels.extend([
analysis.get('bollinger_upper', 0),
analysis.get('bollinger_middle', 0),
analysis.get('bollinger_lower', 0)
])
# 添加不同周期的高低点
periods = [10, 20, 50]
for period in periods:
all_levels.append(analysis.get(f'{period}d_high', 0))
all_levels.append(analysis.get(f'{period}d_low', 0))
# 添加斐波那契回调位
fib_levels = analysis.get('fibonacci_levels', {})
all_levels.extend(fib_levels.values())
# 过滤无效值并排序
all_levels = [level for level in all_levels if level and level > 0]
all_levels.sort()
# 去重(相近的价位视为同一价位)
if not all_levels:
return {'support_levels': [], 'resistance_levels': []}
unique_levels = []
threshold = analysis.get('avg_range', 10) * 0.3 # 阈值为平均波幅的30%
for level in all_levels:
if not unique_levels or abs(level - unique_levels[-1]) > threshold:
unique_levels.append(level)
# 确定当前价格
current_price = analysis.get('recent_high', 3500) * 0.95 # 使用最近高点的95%作为当前价格
# 分离支撑位和阻力位
support_levels = [level for level in unique_levels if level < current_price]
resistance_levels = [level for level in unique_levels if level > current_price]
# 按距离当前价格排序
support_levels.sort(reverse=True) # 最近的支撑位在前
resistance_levels.sort() # 最近的阻力位在前
# 取最近的几个支撑阻力位
support_levels = support_levels[:3] # 最近的3个支撑位
resistance_levels = resistance_levels[:3] # 最近的3个阻力位
return {
'support_levels': support_levels,
'resistance_levels': resistance_levels,
'current_price': current_price
}
def calculate_stop_loss_level(self, data: pd.DataFrame, direction: str, atr: float) -> float:
"""计算智能止损位"""
# 分析支撑阻力位
sr_analysis = self.analyze_support_resistance(data)
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
current_price = data['close'].iloc[-1]
if direction == 'long':
# 做多时,止损位应在最近的支撑位下方
if support_levels:
# 最近的支撑位下方ATR的0.5倍
stop_loss = support_levels[0] - atr * 0.5
else:
# 没有支撑位时使用ATR的2倍
stop_loss = current_price - atr * 2
elif direction == 'short':
# 做空时,止损位应在最近的阻力位上方
if resistance_levels:
# 最近的阻力位上方ATR的0.5倍
stop_loss = resistance_levels[0] + atr * 0.5
else:
# 没有阻力位时使用ATR的2倍
stop_loss = current_price + atr * 2
else:
raise ValueError("Direction must be 'long' or 'short'")
return stop_loss
def calculate_target_price(self, data: pd.DataFrame, direction: str, entry_price: float) -> float:
"""计算目标价"""
# 分析支撑阻力位
sr_analysis = self.analyze_support_resistance(data)
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
if direction == 'long':
# 做多时,目标价应在最近的阻力位
if resistance_levels:
target_price = resistance_levels[0]
else:
# 没有阻力位时,使用近期高点
target_price = sr_analysis.get('recent_high', entry_price * 1.05)
elif direction == 'short':
# 做空时,目标价应在最近的支撑位
if support_levels:
target_price = support_levels[0]
else:
# 没有支撑位时,使用近期低点
target_price = sr_analysis.get('recent_low', entry_price * 0.95)
else:
raise ValueError("Direction must be 'long' or 'short'")
return target_price
def analyze_price_position(self, data: pd.DataFrame) -> Dict:
"""分析价格位置"""
current_price = data['close'].iloc[-1]
# 分析支撑阻力位
sr_analysis = self.analyze_support_resistance(data)
support_levels = sr_analysis.get('support_resistance_levels', {}).get('support_levels', [])
resistance_levels = sr_analysis.get('support_resistance_levels', {}).get('resistance_levels', [])
# 计算价格与支撑阻力位的距离
distance_to_support = float('inf')
distance_to_resistance = float('inf')
if support_levels:
distance_to_support = current_price - support_levels[0]
if resistance_levels:
distance_to_resistance = resistance_levels[0] - current_price
# 分析价格位置
position = 'neutral'
if distance_to_resistance < sr_analysis.get('avg_range', 10) * 0.2:
position = 'near_resistance'
elif distance_to_support < sr_analysis.get('avg_range', 10) * 0.2:
position = 'near_support'
# 分析价格在布林带中的位置
bollinger_upper = sr_analysis.get('bollinger_upper', 0)
bollinger_middle = sr_analysis.get('bollinger_middle', 0)
bollinger_lower = sr_analysis.get('bollinger_lower', 0)
bollinger_position = 'middle'
if current_price > bollinger_upper:
bollinger_position = 'upper'
elif current_price < bollinger_lower:
bollinger_position = 'lower'
return {
'current_price': current_price,
'distance_to_support': distance_to_support,
'distance_to_resistance': distance_to_resistance,
'position': position,
'bollinger_position': bollinger_position,
'support_levels': support_levels,
'resistance_levels': resistance_levels
}