You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
154 lines
4.5 KiB
154 lines
4.5 KiB
# 技术分析工具
|
|
import numpy as np
|
|
import pandas as pd
|
|
from typing import Dict, List, Tuple
|
|
|
|
|
|
def calculate_macd(data: pd.DataFrame, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> Dict[str, pd.Series]:
|
|
"""计算MACD指标"""
|
|
exp1 = data['close'].ewm(span=fast_period, adjust=False).mean()
|
|
exp2 = data['close'].ewm(span=slow_period, adjust=False).mean()
|
|
macd = exp1 - exp2
|
|
signal = macd.ewm(span=signal_period, adjust=False).mean()
|
|
histogram = macd - signal
|
|
|
|
return {
|
|
'macd': macd,
|
|
'signal': signal,
|
|
'histogram': histogram
|
|
}
|
|
|
|
|
|
def calculate_rsi(data: pd.DataFrame, period: int = 14) -> pd.Series:
|
|
"""计算RSI指标"""
|
|
delta = data['close'].diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
|
|
|
rs = gain / loss
|
|
rsi = 100 - (100 / (1 + rs))
|
|
|
|
return rsi
|
|
|
|
|
|
def calculate_bollinger_bands(data: pd.DataFrame, period: int = 20, std_dev: float = 2.0) -> Dict[str, pd.Series]:
|
|
"""计算布林带"""
|
|
sma = data['close'].rolling(window=period).mean()
|
|
std = data['close'].rolling(window=period).std()
|
|
upper_band = sma + (std * std_dev)
|
|
lower_band = sma - (std * std_dev)
|
|
|
|
return {
|
|
'sma': sma,
|
|
'upper_band': upper_band,
|
|
'lower_band': lower_band
|
|
}
|
|
|
|
|
|
def calculate_kdj(data: pd.DataFrame, period: int = 9, signal_period: int = 3) -> Dict[str, pd.Series]:
|
|
"""计算KDJ指标"""
|
|
low_min = data['low'].rolling(window=period).min()
|
|
high_max = data['high'].rolling(window=period).max()
|
|
|
|
rsv = (data['close'] - low_min) / (high_max - low_min) * 100
|
|
k = rsv.ewm(alpha=1/signal_period, adjust=False).mean()
|
|
d = k.ewm(alpha=1/signal_period, adjust=False).mean()
|
|
j = 3 * k - 2 * d
|
|
|
|
return {
|
|
'k': k,
|
|
'd': d,
|
|
'j': j
|
|
}
|
|
|
|
|
|
def calculate_adx(data: pd.DataFrame, period: int = 14) -> Dict[str, pd.Series]:
|
|
"""计算ADX指标"""
|
|
high = data['high']
|
|
low = data['low']
|
|
close = data['close']
|
|
|
|
tr1 = high - low
|
|
tr2 = abs(high - close.shift())
|
|
tr3 = abs(low - close.shift())
|
|
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
|
|
|
|
plus_dm = high.diff()
|
|
minus_dm = low.diff()
|
|
|
|
plus_dm[plus_dm < 0] = 0
|
|
minus_dm[minus_dm > 0] = 0
|
|
minus_dm = abs(minus_dm)
|
|
|
|
atr = tr.rolling(window=period).mean()
|
|
plus_di = (plus_dm.rolling(window=period).mean() / atr) * 100
|
|
minus_di = (minus_dm.rolling(window=period).mean() / atr) * 100
|
|
|
|
dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100
|
|
adx = dx.rolling(window=period).mean()
|
|
|
|
return {
|
|
'adx': adx,
|
|
'plus_di': plus_di,
|
|
'minus_di': minus_di
|
|
}
|
|
|
|
|
|
def calculate_atr(data: pd.DataFrame, period: int = 14) -> pd.Series:
|
|
"""计算ATR指标"""
|
|
high = data['high']
|
|
low = data['low']
|
|
close = data['close']
|
|
|
|
tr1 = high - low
|
|
tr2 = abs(high - close.shift())
|
|
tr3 = abs(low - close.shift())
|
|
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
|
|
|
|
atr = tr.rolling(window=period).mean()
|
|
|
|
return atr
|
|
|
|
|
|
def calculate_moving_average(data: pd.DataFrame, periods: List[int]) -> Dict[str, pd.Series]:
|
|
"""计算移动平均线"""
|
|
mas = {}
|
|
for period in periods:
|
|
mas[f'ma{period}'] = data['close'].rolling(window=period).mean()
|
|
|
|
return mas
|
|
|
|
|
|
def calculate_price_quantile(data: pd.DataFrame, period: int = 100) -> float:
|
|
"""计算价格分位"""
|
|
prices = data['close'].tail(period)
|
|
current_price = prices.iloc[-1]
|
|
quantile = (prices <= current_price).sum() / len(prices)
|
|
|
|
return quantile
|
|
|
|
|
|
def calculate_volume_price_strength(data: pd.DataFrame, period: int = 20) -> float:
|
|
"""计算量价强度"""
|
|
df = data.tail(period).copy()
|
|
df['price_change'] = df['close'].pct_change()
|
|
df['volume_change'] = df['volume'].pct_change()
|
|
|
|
# 量价配合度
|
|
strength = 0
|
|
for i in range(1, len(df)):
|
|
if (df['price_change'].iloc[i] > 0 and df['volume_change'].iloc[i] > 0) or \
|
|
(df['price_change'].iloc[i] < 0 and df['volume_change'].iloc[i] < 0):
|
|
strength += abs(df['price_change'].iloc[i]) * (1 + abs(df['volume_change'].iloc[i]))
|
|
else:
|
|
strength -= abs(df['price_change'].iloc[i]) * (1 + abs(df['volume_change'].iloc[i]))
|
|
|
|
# 归一化到0-100
|
|
max_strength = abs(strength)
|
|
if max_strength == 0:
|
|
return 50
|
|
|
|
normalized_strength = (strength / max_strength + 1) / 2 * 100
|
|
|
|
return normalized_strength
|