# 技术分析工具 import numpy as np import pandas as pd from typing import Dict, List, Tuple def calculate_macd(data: pd.DataFrame, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> Dict[str, pd.Series]: """计算MACD指标""" exp1 = data['close'].ewm(span=fast_period, adjust=False).mean() exp2 = data['close'].ewm(span=slow_period, adjust=False).mean() macd = exp1 - exp2 signal = macd.ewm(span=signal_period, adjust=False).mean() histogram = macd - signal return { 'macd': macd, 'signal': signal, 'histogram': histogram } def calculate_rsi(data: pd.DataFrame, period: int = 14) -> pd.Series: """计算RSI指标""" delta = data['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi def calculate_bollinger_bands(data: pd.DataFrame, period: int = 20, std_dev: float = 2.0) -> Dict[str, pd.Series]: """计算布林带""" sma = data['close'].rolling(window=period).mean() std = data['close'].rolling(window=period).std() upper_band = sma + (std * std_dev) lower_band = sma - (std * std_dev) return { 'sma': sma, 'upper_band': upper_band, 'lower_band': lower_band } def calculate_kdj(data: pd.DataFrame, period: int = 9, signal_period: int = 3) -> Dict[str, pd.Series]: """计算KDJ指标""" low_min = data['low'].rolling(window=period).min() high_max = data['high'].rolling(window=period).max() rsv = (data['close'] - low_min) / (high_max - low_min) * 100 k = rsv.ewm(alpha=1/signal_period, adjust=False).mean() d = k.ewm(alpha=1/signal_period, adjust=False).mean() j = 3 * k - 2 * d return { 'k': k, 'd': d, 'j': j } def calculate_adx(data: pd.DataFrame, period: int = 14) -> Dict[str, pd.Series]: """计算ADX指标""" high = data['high'] low = data['low'] close = data['close'] tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) plus_dm = high.diff() minus_dm = low.diff() plus_dm[plus_dm < 0] = 0 minus_dm[minus_dm > 0] = 0 minus_dm = abs(minus_dm) atr = tr.rolling(window=period).mean() plus_di = (plus_dm.rolling(window=period).mean() / atr) * 100 minus_di = (minus_dm.rolling(window=period).mean() / atr) * 100 dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100 adx = dx.rolling(window=period).mean() return { 'adx': adx, 'plus_di': plus_di, 'minus_di': minus_di } def calculate_atr(data: pd.DataFrame, period: int = 14) -> pd.Series: """计算ATR指标""" high = data['high'] low = data['low'] close = data['close'] tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=period).mean() return atr def calculate_moving_average(data: pd.DataFrame, periods: List[int]) -> Dict[str, pd.Series]: """计算移动平均线""" mas = {} for period in periods: mas[f'ma{period}'] = data['close'].rolling(window=period).mean() return mas def calculate_price_quantile(data: pd.DataFrame, period: int = 100) -> float: """计算价格分位""" prices = data['close'].tail(period) current_price = prices.iloc[-1] quantile = (prices <= current_price).sum() / len(prices) return quantile def calculate_volume_price_strength(data: pd.DataFrame, period: int = 20) -> float: """计算量价强度""" df = data.tail(period).copy() df['price_change'] = df['close'].pct_change() df['volume_change'] = df['volume'].pct_change() # 量价配合度 strength = 0 for i in range(1, len(df)): if (df['price_change'].iloc[i] > 0 and df['volume_change'].iloc[i] > 0) or \ (df['price_change'].iloc[i] < 0 and df['volume_change'].iloc[i] < 0): strength += abs(df['price_change'].iloc[i]) * (1 + abs(df['volume_change'].iloc[i])) else: strength -= abs(df['price_change'].iloc[i]) * (1 + abs(df['volume_change'].iloc[i])) # 归一化到0-100 max_strength = abs(strength) if max_strength == 0: return 50 normalized_strength = (strength / max_strength + 1) / 2 * 100 return normalized_strength