You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

484 lines
17 KiB

# 换月预警模块
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
class RolloverDetector:
"""换月预警检测器"""
def __init__(self):
pass
def analyze_rollover(self, symbol: str, data: pd.DataFrame, contract_info: Optional[Dict] = None) -> Dict:
"""分析换月情况"""
result = {}
# 检测交割日
delivery_info = self._detect_delivery_date(symbol, contract_info)
result.update(delivery_info)
# 分析流动性
liquidity_analysis = self._analyze_liquidity(data)
result.update(liquidity_analysis)
# 分析价差
spread_analysis = self._analyze_spread(symbol)
result.update(spread_analysis)
# 生成换月预警
rollover_warning = self._generate_rollover_warning(delivery_info, liquidity_analysis)
result['rollover_warning'] = rollover_warning
# 生成减仓建议
position_adjustment = self._generate_position_adjustment(delivery_info, liquidity_analysis)
result['position_adjustment'] = position_adjustment
return result
def _detect_delivery_date(self, symbol: str, contract_info: Optional[Dict] = None) -> Dict:
"""检测交割日"""
if contract_info and 'expire_datetime' in contract_info:
# 使用合约信息中的交割日
expire_timestamp = contract_info['expire_datetime']
if isinstance(expire_timestamp, int):
# 处理纳秒时间戳
if expire_timestamp > 1e15:
expire_date = datetime.fromtimestamp(expire_timestamp / 1e9)
else:
expire_date = datetime.fromtimestamp(expire_timestamp)
else:
expire_date = pd.to_datetime(expire_timestamp)
else:
# 基于合约代码推断交割日
expire_date = self._infer_delivery_date(symbol)
# 计算距离交割日的天数
today = datetime.now()
days_to_delivery = (expire_date - today).days
# 确定换月预警级别
warning_level = self._calculate_warning_level(days_to_delivery)
return {
'expire_date': expire_date.strftime('%Y-%m-%d'),
'days_to_delivery': days_to_delivery,
'warning_level': warning_level
}
def _infer_delivery_date(self, symbol: str) -> datetime:
"""基于合约代码推断交割日"""
# 简化的合约代码解析
# 假设合约代码格式为:品种+年份+月份,如 'CU2309'
try:
# 提取年份和月份
year_str = symbol[-4:-2]
month_str = symbol[-2:]
# 构建年份(加上世纪)
year = 2000 + int(year_str)
month = int(month_str)
# 假设交割日为合约月份的15日
# 实际交割日可能因品种而异,这里使用简化处理
expire_date = datetime(year, month, 15)
return expire_date
except Exception:
# 如果解析失败返回60天后的日期
return datetime.now() + timedelta(days=60)
def _calculate_warning_level(self, days_to_delivery: int) -> str:
"""计算换月预警级别"""
if days_to_delivery <= 3:
return 'critical'
elif days_to_delivery <= 7:
return 'high'
elif days_to_delivery <= 15:
return 'medium'
elif days_to_delivery <= 30:
return 'low'
else:
return 'none'
def _analyze_liquidity(self, data: pd.DataFrame) -> Dict:
"""分析流动性"""
# 计算成交量指标
avg_volume = data['volume'].tail(20).mean()
volume_trend = self._analyze_volume_trend(data['volume'])
# 计算持仓量指标
avg_open_interest = data['open_interest'].tail(20).mean()
oi_trend = self._analyze_oi_trend(data['open_interest'])
# 计算买卖价差(简化处理)
# 实际应该使用Tick数据计算
bid_ask_spread = self._estimate_bid_ask_spread(data)
# 计算流动性评分
liquidity_score = self._calculate_liquidity_score(avg_volume, volume_trend, avg_open_interest, oi_trend, bid_ask_spread)
# 确定流动性风险级别
liquidity_risk = self._calculate_liquidity_risk(liquidity_score)
return {
'avg_volume': avg_volume,
'volume_trend': volume_trend,
'avg_open_interest': avg_open_interest,
'oi_trend': oi_trend,
'bid_ask_spread': bid_ask_spread,
'liquidity_score': liquidity_score,
'liquidity_risk': liquidity_risk
}
def _analyze_volume_trend(self, volume_series: pd.Series) -> str:
"""分析成交量趋势"""
if len(volume_series) < 10:
return 'stable'
# 计算短期和长期移动平均线
short_ma = volume_series.tail(10).mean()
long_ma = volume_series.tail(30).mean()
if short_ma > long_ma * 1.1:
return 'increasing'
elif short_ma < long_ma * 0.9:
return 'decreasing'
else:
return 'stable'
def _analyze_oi_trend(self, oi_series: pd.Series) -> str:
"""分析持仓量趋势"""
if len(oi_series) < 10:
return 'stable'
# 计算短期和长期移动平均线
short_ma = oi_series.tail(10).mean()
long_ma = oi_series.tail(30).mean()
if short_ma > long_ma * 1.1:
return 'increasing'
elif short_ma < long_ma * 0.9:
return 'decreasing'
else:
return 'stable'
def _estimate_bid_ask_spread(self, data: pd.DataFrame) -> float:
"""估算买卖价差"""
# 简化处理,使用收盘价的波动来估算
price_volatility = data['close'].tail(20).std()
# 假设价差为波动率的10%
return price_volatility * 0.1
def _calculate_liquidity_score(self, avg_volume: float, volume_trend: str,
avg_open_interest: float, oi_trend: str,
bid_ask_spread: float) -> float:
"""计算流动性评分"""
# 基础分数
base_score = 100
# 成交量因素
if avg_volume < 1000:
base_score -= 30
elif avg_volume < 5000:
base_score -= 15
# 成交量趋势因素
if volume_trend == 'decreasing':
base_score -= 20
elif volume_trend == 'increasing':
base_score += 10
# 持仓量因素
if avg_open_interest < 5000:
base_score -= 20
elif avg_open_interest < 20000:
base_score -= 10
# 持仓量趋势因素
if oi_trend == 'decreasing':
base_score -= 15
elif oi_trend == 'increasing':
base_score += 5
# 买卖价差因素
if bid_ask_spread > 0.5:
base_score -= 25
elif bid_ask_spread > 0.2:
base_score -= 10
# 确保分数在0-100之间
return max(0, min(100, base_score))
def _calculate_liquidity_risk(self, liquidity_score: float) -> str:
"""计算流动性风险"""
if liquidity_score < 30:
return 'high'
elif liquidity_score < 60:
return 'medium'
else:
return 'low'
def _analyze_spread(self, symbol: str) -> Dict:
"""分析价差"""
# 简化处理,实际应该比较当前合约和下一个合约的价差
# 这里返回模拟数据
return {
'current_next_spread': 5.2,
'spread_trend': 'stable',
'spread_ratio': 0.0015
}
def _generate_rollover_warning(self, delivery_info: Dict, liquidity_info: Dict) -> Dict:
"""生成换月预警"""
warning_level = delivery_info['warning_level']
liquidity_risk = liquidity_info['liquidity_risk']
# 综合预警
overall_warning = 'none'
if warning_level in ['critical', 'high'] or liquidity_risk == 'high':
overall_warning = 'high'
elif warning_level == 'medium' or liquidity_risk == 'medium':
overall_warning = 'medium'
elif warning_level == 'low':
overall_warning = 'low'
# 预警信息
warning_message = self._generate_warning_message(warning_level, liquidity_risk)
# 建议操作
recommended_actions = self._generate_recommended_actions(warning_level, liquidity_risk)
return {
'overall_warning': overall_warning,
'warning_message': warning_message,
'recommended_actions': recommended_actions
}
def _generate_warning_message(self, warning_level: str, liquidity_risk: str) -> str:
"""生成预警信息"""
messages = []
if warning_level == 'critical':
messages.append('合约即将到期距离交割日不足3天')
elif warning_level == 'high':
messages.append('合约接近到期距离交割日不足7天')
elif warning_level == 'medium':
messages.append('合约距离交割日不足15天建议开始关注换月')
if liquidity_risk == 'high':
messages.append('流动性风险较高,可能影响交易执行')
elif liquidity_risk == 'medium':
messages.append('流动性风险中等,建议谨慎交易')
if not messages:
return '合约状态正常,无需特殊关注'
return '; '.join(messages)
def _generate_recommended_actions(self, warning_level: str, liquidity_risk: str) -> List[str]:
"""生成建议操作"""
actions = []
if warning_level in ['critical', 'high']:
actions.append('立即开始换月操作')
actions.append('逐步减仓当前合约')
actions.append('在新合约建立相应仓位')
elif warning_level == 'medium':
actions.append('开始评估换月时机')
actions.append('关注新合约流动性')
if liquidity_risk == 'high':
actions.append('减小单笔交易规模')
actions.append('使用限价单而非市价单')
actions.append('考虑提前换月')
return actions
def _generate_position_adjustment(self, delivery_info: Dict, liquidity_info: Dict) -> Dict:
"""生成仓位调整建议"""
days_to_delivery = delivery_info['days_to_delivery']
warning_level = delivery_info['warning_level']
liquidity_risk = liquidity_info['liquidity_risk']
# 计算减仓比例
reduction_ratio = self._calculate_reduction_ratio(days_to_delivery, warning_level, liquidity_risk)
# 计算建议的减仓时间表
reduction_schedule = self._generate_reduction_schedule(days_to_delivery, reduction_ratio)
# 计算新合约建仓建议
new_contract_adjustment = self._generate_new_contract_adjustment(reduction_ratio)
return {
'reduction_ratio': reduction_ratio,
'reduction_schedule': reduction_schedule,
'new_contract_adjustment': new_contract_adjustment
}
def _calculate_reduction_ratio(self, days_to_delivery: int, warning_level: str, liquidity_risk: str) -> float:
"""计算减仓比例"""
# 基础减仓比例
base_ratio = 0.0
if warning_level == 'critical':
base_ratio = 0.9 # 减仓90%
elif warning_level == 'high':
base_ratio = 0.7 # 减仓70%
elif warning_level == 'medium':
base_ratio = 0.4 # 减仓40%
elif warning_level == 'low':
base_ratio = 0.2 # 减仓20%
# 流动性风险调整
if liquidity_risk == 'high':
base_ratio = min(1.0, base_ratio + 0.2)
elif liquidity_risk == 'medium':
base_ratio = min(1.0, base_ratio + 0.1)
return base_ratio
def _generate_reduction_schedule(self, days_to_delivery: int, reduction_ratio: float) -> List[Dict]:
"""生成减仓时间表"""
schedule = []
if days_to_delivery <= 3:
# 紧急减仓
schedule.append({
'timeframe': '今日',
'reduction_ratio': reduction_ratio
})
elif days_to_delivery <= 7:
# 快速减仓
daily_ratio = reduction_ratio / 3
for i in range(3):
schedule.append({
'timeframe': f'{i+1}天内',
'reduction_ratio': daily_ratio
})
elif days_to_delivery <= 15:
# 逐步减仓
daily_ratio = reduction_ratio / 5
for i in range(5):
schedule.append({
'timeframe': f'{i+1}天内',
'reduction_ratio': daily_ratio
})
elif days_to_delivery <= 30:
# 缓慢减仓
weekly_ratio = reduction_ratio / 2
schedule.append({
'timeframe': '第一周',
'reduction_ratio': weekly_ratio
})
schedule.append({
'timeframe': '第二周',
'reduction_ratio': weekly_ratio
})
return schedule
def _generate_new_contract_adjustment(self, reduction_ratio: float) -> Dict:
"""生成新合约建仓建议"""
# 建议在新合约建立与原合约相同方向的仓位
# 建仓比例应与减仓比例对应
return {
'direction': 'same_as_current',
'target_ratio': reduction_ratio,
'execution_strategy': 'gradual',
'considerations': [
'关注新合约流动性',
'注意合约间价差',
'避免在换月高峰期交易'
]
}
def monitor_rollover_risk(self, symbol: str, data: pd.DataFrame, position_size: float) -> Dict:
"""监控换月风险"""
# 分析换月情况
rollover_analysis = self.analyze_rollover(symbol, data)
# 计算风险暴露
risk_exposure = self._calculate_risk_exposure(position_size, rollover_analysis)
# 生成风险报告
risk_report = {
'symbol': symbol,
'position_size': position_size,
'rollover_analysis': rollover_analysis,
'risk_exposure': risk_exposure,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return risk_report
def _calculate_risk_exposure(self, position_size: float, rollover_analysis: Dict) -> Dict:
"""计算风险暴露"""
warning_level = rollover_analysis['warning_level']
liquidity_risk = rollover_analysis.get('liquidity_risk', 'low')
# 基础风险分数
base_risk = 0
if warning_level == 'critical':
base_risk = 90
elif warning_level == 'high':
base_risk = 70
elif warning_level == 'medium':
base_risk = 40
elif warning_level == 'low':
base_risk = 20
# 流动性风险调整
if liquidity_risk == 'high':
base_risk += 20
elif liquidity_risk == 'medium':
base_risk += 10
# 仓位大小调整
if position_size > 10:
base_risk += 15
elif position_size > 5:
base_risk += 5
# 确保风险分数在0-100之间
risk_score = max(0, min(100, base_risk))
# 风险等级
risk_level = 'low'
if risk_score >= 80:
risk_level = 'critical'
elif risk_score >= 60:
risk_level = 'high'
elif risk_score >= 30:
risk_level = 'medium'
return {
'risk_score': risk_score,
'risk_level': risk_level,
'recommendations': self._generate_risk_recommendations(risk_level)
}
def _generate_risk_recommendations(self, risk_level: str) -> List[str]:
"""生成风险建议"""
recommendations = []
if risk_level == 'critical':
recommendations.append('立即减仓至最小仓位')
recommendations.append('优先处理换月操作')
recommendations.append('密切监控市场流动性')
elif risk_level == 'high':
recommendations.append('大幅减仓当前合约')
recommendations.append('加速换月进程')
recommendations.append('使用限价单控制交易成本')
elif risk_level == 'medium':
recommendations.append('开始有序减仓')
recommendations.append('评估换月时机')
recommendations.append('关注新合约表现')
else:
recommendations.append('保持正常交易')
recommendations.append('定期监控合约到期情况')
return recommendations