# 换月预警模块 import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple class RolloverDetector: """换月预警检测器""" def __init__(self): pass def analyze_rollover(self, symbol: str, data: pd.DataFrame, contract_info: Optional[Dict] = None) -> Dict: """分析换月情况""" result = {} # 检测交割日 delivery_info = self._detect_delivery_date(symbol, contract_info) result.update(delivery_info) # 分析流动性 liquidity_analysis = self._analyze_liquidity(data) result.update(liquidity_analysis) # 分析价差 spread_analysis = self._analyze_spread(symbol) result.update(spread_analysis) # 生成换月预警 rollover_warning = self._generate_rollover_warning(delivery_info, liquidity_analysis) result['rollover_warning'] = rollover_warning # 生成减仓建议 position_adjustment = self._generate_position_adjustment(delivery_info, liquidity_analysis) result['position_adjustment'] = position_adjustment return result def _detect_delivery_date(self, symbol: str, contract_info: Optional[Dict] = None) -> Dict: """检测交割日""" if contract_info and 'expire_datetime' in contract_info: # 使用合约信息中的交割日 expire_timestamp = contract_info['expire_datetime'] if isinstance(expire_timestamp, int): # 处理纳秒时间戳 if expire_timestamp > 1e15: expire_date = datetime.fromtimestamp(expire_timestamp / 1e9) else: expire_date = datetime.fromtimestamp(expire_timestamp) else: expire_date = pd.to_datetime(expire_timestamp) else: # 基于合约代码推断交割日 expire_date = self._infer_delivery_date(symbol) # 计算距离交割日的天数 today = datetime.now() days_to_delivery = (expire_date - today).days # 确定换月预警级别 warning_level = self._calculate_warning_level(days_to_delivery) return { 'expire_date': expire_date.strftime('%Y-%m-%d'), 'days_to_delivery': days_to_delivery, 'warning_level': warning_level } def _infer_delivery_date(self, symbol: str) -> datetime: """基于合约代码推断交割日""" # 简化的合约代码解析 # 假设合约代码格式为:品种+年份+月份,如 'CU2309' try: # 提取年份和月份 year_str = symbol[-4:-2] month_str = symbol[-2:] # 构建年份(加上世纪) year = 2000 + int(year_str) month = int(month_str) # 假设交割日为合约月份的15日 # 实际交割日可能因品种而异,这里使用简化处理 expire_date = datetime(year, month, 15) return expire_date except Exception: # 如果解析失败,返回60天后的日期 return datetime.now() + timedelta(days=60) def _calculate_warning_level(self, days_to_delivery: int) -> str: """计算换月预警级别""" if days_to_delivery <= 3: return 'critical' elif days_to_delivery <= 7: return 'high' elif days_to_delivery <= 15: return 'medium' elif days_to_delivery <= 30: return 'low' else: return 'none' def _analyze_liquidity(self, data: pd.DataFrame) -> Dict: """分析流动性""" # 计算成交量指标 avg_volume = data['volume'].tail(20).mean() volume_trend = self._analyze_volume_trend(data['volume']) # 计算持仓量指标 avg_open_interest = data['open_interest'].tail(20).mean() oi_trend = self._analyze_oi_trend(data['open_interest']) # 计算买卖价差(简化处理) # 实际应该使用Tick数据计算 bid_ask_spread = self._estimate_bid_ask_spread(data) # 计算流动性评分 liquidity_score = self._calculate_liquidity_score(avg_volume, volume_trend, avg_open_interest, oi_trend, bid_ask_spread) # 确定流动性风险级别 liquidity_risk = self._calculate_liquidity_risk(liquidity_score) return { 'avg_volume': avg_volume, 'volume_trend': volume_trend, 'avg_open_interest': avg_open_interest, 'oi_trend': oi_trend, 'bid_ask_spread': bid_ask_spread, 'liquidity_score': liquidity_score, 'liquidity_risk': liquidity_risk } def _analyze_volume_trend(self, volume_series: pd.Series) -> str: """分析成交量趋势""" if len(volume_series) < 10: return 'stable' # 计算短期和长期移动平均线 short_ma = volume_series.tail(10).mean() long_ma = volume_series.tail(30).mean() if short_ma > long_ma * 1.1: return 'increasing' elif short_ma < long_ma * 0.9: return 'decreasing' else: return 'stable' def _analyze_oi_trend(self, oi_series: pd.Series) -> str: """分析持仓量趋势""" if len(oi_series) < 10: return 'stable' # 计算短期和长期移动平均线 short_ma = oi_series.tail(10).mean() long_ma = oi_series.tail(30).mean() if short_ma > long_ma * 1.1: return 'increasing' elif short_ma < long_ma * 0.9: return 'decreasing' else: return 'stable' def _estimate_bid_ask_spread(self, data: pd.DataFrame) -> float: """估算买卖价差""" # 简化处理,使用收盘价的波动来估算 price_volatility = data['close'].tail(20).std() # 假设价差为波动率的10% return price_volatility * 0.1 def _calculate_liquidity_score(self, avg_volume: float, volume_trend: str, avg_open_interest: float, oi_trend: str, bid_ask_spread: float) -> float: """计算流动性评分""" # 基础分数 base_score = 100 # 成交量因素 if avg_volume < 1000: base_score -= 30 elif avg_volume < 5000: base_score -= 15 # 成交量趋势因素 if volume_trend == 'decreasing': base_score -= 20 elif volume_trend == 'increasing': base_score += 10 # 持仓量因素 if avg_open_interest < 5000: base_score -= 20 elif avg_open_interest < 20000: base_score -= 10 # 持仓量趋势因素 if oi_trend == 'decreasing': base_score -= 15 elif oi_trend == 'increasing': base_score += 5 # 买卖价差因素 if bid_ask_spread > 0.5: base_score -= 25 elif bid_ask_spread > 0.2: base_score -= 10 # 确保分数在0-100之间 return max(0, min(100, base_score)) def _calculate_liquidity_risk(self, liquidity_score: float) -> str: """计算流动性风险""" if liquidity_score < 30: return 'high' elif liquidity_score < 60: return 'medium' else: return 'low' def _analyze_spread(self, symbol: str) -> Dict: """分析价差""" # 简化处理,实际应该比较当前合约和下一个合约的价差 # 这里返回模拟数据 return { 'current_next_spread': 5.2, 'spread_trend': 'stable', 'spread_ratio': 0.0015 } def _generate_rollover_warning(self, delivery_info: Dict, liquidity_info: Dict) -> Dict: """生成换月预警""" warning_level = delivery_info['warning_level'] liquidity_risk = liquidity_info['liquidity_risk'] # 综合预警 overall_warning = 'none' if warning_level in ['critical', 'high'] or liquidity_risk == 'high': overall_warning = 'high' elif warning_level == 'medium' or liquidity_risk == 'medium': overall_warning = 'medium' elif warning_level == 'low': overall_warning = 'low' # 预警信息 warning_message = self._generate_warning_message(warning_level, liquidity_risk) # 建议操作 recommended_actions = self._generate_recommended_actions(warning_level, liquidity_risk) return { 'overall_warning': overall_warning, 'warning_message': warning_message, 'recommended_actions': recommended_actions } def _generate_warning_message(self, warning_level: str, liquidity_risk: str) -> str: """生成预警信息""" messages = [] if warning_level == 'critical': messages.append('合约即将到期,距离交割日不足3天') elif warning_level == 'high': messages.append('合约接近到期,距离交割日不足7天') elif warning_level == 'medium': messages.append('合约距离交割日不足15天,建议开始关注换月') if liquidity_risk == 'high': messages.append('流动性风险较高,可能影响交易执行') elif liquidity_risk == 'medium': messages.append('流动性风险中等,建议谨慎交易') if not messages: return '合约状态正常,无需特殊关注' return '; '.join(messages) def _generate_recommended_actions(self, warning_level: str, liquidity_risk: str) -> List[str]: """生成建议操作""" actions = [] if warning_level in ['critical', 'high']: actions.append('立即开始换月操作') actions.append('逐步减仓当前合约') actions.append('在新合约建立相应仓位') elif warning_level == 'medium': actions.append('开始评估换月时机') actions.append('关注新合约流动性') if liquidity_risk == 'high': actions.append('减小单笔交易规模') actions.append('使用限价单而非市价单') actions.append('考虑提前换月') return actions def _generate_position_adjustment(self, delivery_info: Dict, liquidity_info: Dict) -> Dict: """生成仓位调整建议""" days_to_delivery = delivery_info['days_to_delivery'] warning_level = delivery_info['warning_level'] liquidity_risk = liquidity_info['liquidity_risk'] # 计算减仓比例 reduction_ratio = self._calculate_reduction_ratio(days_to_delivery, warning_level, liquidity_risk) # 计算建议的减仓时间表 reduction_schedule = self._generate_reduction_schedule(days_to_delivery, reduction_ratio) # 计算新合约建仓建议 new_contract_adjustment = self._generate_new_contract_adjustment(reduction_ratio) return { 'reduction_ratio': reduction_ratio, 'reduction_schedule': reduction_schedule, 'new_contract_adjustment': new_contract_adjustment } def _calculate_reduction_ratio(self, days_to_delivery: int, warning_level: str, liquidity_risk: str) -> float: """计算减仓比例""" # 基础减仓比例 base_ratio = 0.0 if warning_level == 'critical': base_ratio = 0.9 # 减仓90% elif warning_level == 'high': base_ratio = 0.7 # 减仓70% elif warning_level == 'medium': base_ratio = 0.4 # 减仓40% elif warning_level == 'low': base_ratio = 0.2 # 减仓20% # 流动性风险调整 if liquidity_risk == 'high': base_ratio = min(1.0, base_ratio + 0.2) elif liquidity_risk == 'medium': base_ratio = min(1.0, base_ratio + 0.1) return base_ratio def _generate_reduction_schedule(self, days_to_delivery: int, reduction_ratio: float) -> List[Dict]: """生成减仓时间表""" schedule = [] if days_to_delivery <= 3: # 紧急减仓 schedule.append({ 'timeframe': '今日', 'reduction_ratio': reduction_ratio }) elif days_to_delivery <= 7: # 快速减仓 daily_ratio = reduction_ratio / 3 for i in range(3): schedule.append({ 'timeframe': f'{i+1}天内', 'reduction_ratio': daily_ratio }) elif days_to_delivery <= 15: # 逐步减仓 daily_ratio = reduction_ratio / 5 for i in range(5): schedule.append({ 'timeframe': f'{i+1}天内', 'reduction_ratio': daily_ratio }) elif days_to_delivery <= 30: # 缓慢减仓 weekly_ratio = reduction_ratio / 2 schedule.append({ 'timeframe': '第一周', 'reduction_ratio': weekly_ratio }) schedule.append({ 'timeframe': '第二周', 'reduction_ratio': weekly_ratio }) return schedule def _generate_new_contract_adjustment(self, reduction_ratio: float) -> Dict: """生成新合约建仓建议""" # 建议在新合约建立与原合约相同方向的仓位 # 建仓比例应与减仓比例对应 return { 'direction': 'same_as_current', 'target_ratio': reduction_ratio, 'execution_strategy': 'gradual', 'considerations': [ '关注新合约流动性', '注意合约间价差', '避免在换月高峰期交易' ] } def monitor_rollover_risk(self, symbol: str, data: pd.DataFrame, position_size: float) -> Dict: """监控换月风险""" # 分析换月情况 rollover_analysis = self.analyze_rollover(symbol, data) # 计算风险暴露 risk_exposure = self._calculate_risk_exposure(position_size, rollover_analysis) # 生成风险报告 risk_report = { 'symbol': symbol, 'position_size': position_size, 'rollover_analysis': rollover_analysis, 'risk_exposure': risk_exposure, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } return risk_report def _calculate_risk_exposure(self, position_size: float, rollover_analysis: Dict) -> Dict: """计算风险暴露""" warning_level = rollover_analysis['warning_level'] liquidity_risk = rollover_analysis.get('liquidity_risk', 'low') # 基础风险分数 base_risk = 0 if warning_level == 'critical': base_risk = 90 elif warning_level == 'high': base_risk = 70 elif warning_level == 'medium': base_risk = 40 elif warning_level == 'low': base_risk = 20 # 流动性风险调整 if liquidity_risk == 'high': base_risk += 20 elif liquidity_risk == 'medium': base_risk += 10 # 仓位大小调整 if position_size > 10: base_risk += 15 elif position_size > 5: base_risk += 5 # 确保风险分数在0-100之间 risk_score = max(0, min(100, base_risk)) # 风险等级 risk_level = 'low' if risk_score >= 80: risk_level = 'critical' elif risk_score >= 60: risk_level = 'high' elif risk_score >= 30: risk_level = 'medium' return { 'risk_score': risk_score, 'risk_level': risk_level, 'recommendations': self._generate_risk_recommendations(risk_level) } def _generate_risk_recommendations(self, risk_level: str) -> List[str]: """生成风险建议""" recommendations = [] if risk_level == 'critical': recommendations.append('立即减仓至最小仓位') recommendations.append('优先处理换月操作') recommendations.append('密切监控市场流动性') elif risk_level == 'high': recommendations.append('大幅减仓当前合约') recommendations.append('加速换月进程') recommendations.append('使用限价单控制交易成本') elif risk_level == 'medium': recommendations.append('开始有序减仓') recommendations.append('评估换月时机') recommendations.append('关注新合约表现') else: recommendations.append('保持正常交易') recommendations.append('定期监控合约到期情况') return recommendations