You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
4.5 KiB

# 技术分析工具
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
def calculate_macd(data: pd.DataFrame, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> Dict[str, pd.Series]:
"""计算MACD指标"""
exp1 = data['close'].ewm(span=fast_period, adjust=False).mean()
exp2 = data['close'].ewm(span=slow_period, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=signal_period, adjust=False).mean()
histogram = macd - signal
return {
'macd': macd,
'signal': signal,
'histogram': histogram
}
def calculate_rsi(data: pd.DataFrame, period: int = 14) -> pd.Series:
"""计算RSI指标"""
delta = data['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_bollinger_bands(data: pd.DataFrame, period: int = 20, std_dev: float = 2.0) -> Dict[str, pd.Series]:
"""计算布林带"""
sma = data['close'].rolling(window=period).mean()
std = data['close'].rolling(window=period).std()
upper_band = sma + (std * std_dev)
lower_band = sma - (std * std_dev)
return {
'sma': sma,
'upper_band': upper_band,
'lower_band': lower_band
}
def calculate_kdj(data: pd.DataFrame, period: int = 9, signal_period: int = 3) -> Dict[str, pd.Series]:
"""计算KDJ指标"""
low_min = data['low'].rolling(window=period).min()
high_max = data['high'].rolling(window=period).max()
rsv = (data['close'] - low_min) / (high_max - low_min) * 100
k = rsv.ewm(alpha=1/signal_period, adjust=False).mean()
d = k.ewm(alpha=1/signal_period, adjust=False).mean()
j = 3 * k - 2 * d
return {
'k': k,
'd': d,
'j': j
}
def calculate_adx(data: pd.DataFrame, period: int = 14) -> Dict[str, pd.Series]:
"""计算ADX指标"""
high = data['high']
low = data['low']
close = data['close']
tr1 = high - low
tr2 = abs(high - close.shift())
tr3 = abs(low - close.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
plus_dm = high.diff()
minus_dm = low.diff()
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm > 0] = 0
minus_dm = abs(minus_dm)
atr = tr.rolling(window=period).mean()
plus_di = (plus_dm.rolling(window=period).mean() / atr) * 100
minus_di = (minus_dm.rolling(window=period).mean() / atr) * 100
dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100
adx = dx.rolling(window=period).mean()
return {
'adx': adx,
'plus_di': plus_di,
'minus_di': minus_di
}
def calculate_atr(data: pd.DataFrame, period: int = 14) -> pd.Series:
"""计算ATR指标"""
high = data['high']
low = data['low']
close = data['close']
tr1 = high - low
tr2 = abs(high - close.shift())
tr3 = abs(low - close.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window=period).mean()
return atr
def calculate_moving_average(data: pd.DataFrame, periods: List[int]) -> Dict[str, pd.Series]:
"""计算移动平均线"""
mas = {}
for period in periods:
mas[f'ma{period}'] = data['close'].rolling(window=period).mean()
return mas
def calculate_price_quantile(data: pd.DataFrame, period: int = 100) -> float:
"""计算价格分位"""
prices = data['close'].tail(period)
current_price = prices.iloc[-1]
quantile = (prices <= current_price).sum() / len(prices)
return quantile
def calculate_volume_price_strength(data: pd.DataFrame, period: int = 20) -> float:
"""计算量价强度"""
df = data.tail(period).copy()
df['price_change'] = df['close'].pct_change()
df['volume_change'] = df['volume'].pct_change()
# 量价配合度
strength = 0
for i in range(1, len(df)):
if (df['price_change'].iloc[i] > 0 and df['volume_change'].iloc[i] > 0) or \
(df['price_change'].iloc[i] < 0 and df['volume_change'].iloc[i] < 0):
strength += abs(df['price_change'].iloc[i]) * (1 + abs(df['volume_change'].iloc[i]))
else:
strength -= abs(df['price_change'].iloc[i]) * (1 + abs(df['volume_change'].iloc[i]))
# 归一化到0-100
max_strength = abs(strength)
if max_strength == 0:
return 50
normalized_strength = (strength / max_strength + 1) / 2 * 100
return normalized_strength