You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

484 lines
17 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 换月预警模块
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
class RolloverDetector:
"""换月预警检测器"""
def __init__(self):
pass
def analyze_rollover(self, symbol: str, data: pd.DataFrame, contract_info: Optional[Dict] = None) -> Dict:
"""分析换月情况"""
result = {}
# 检测交割日
delivery_info = self._detect_delivery_date(symbol, contract_info)
result.update(delivery_info)
# 分析流动性
liquidity_analysis = self._analyze_liquidity(data)
result.update(liquidity_analysis)
# 分析价差
spread_analysis = self._analyze_spread(symbol)
result.update(spread_analysis)
# 生成换月预警
rollover_warning = self._generate_rollover_warning(delivery_info, liquidity_analysis)
result['rollover_warning'] = rollover_warning
# 生成减仓建议
position_adjustment = self._generate_position_adjustment(delivery_info, liquidity_analysis)
result['position_adjustment'] = position_adjustment
return result
def _detect_delivery_date(self, symbol: str, contract_info: Optional[Dict] = None) -> Dict:
"""检测交割日"""
if contract_info and 'expire_datetime' in contract_info:
# 使用合约信息中的交割日
expire_timestamp = contract_info['expire_datetime']
if isinstance(expire_timestamp, int):
# 处理纳秒时间戳
if expire_timestamp > 1e15:
expire_date = datetime.fromtimestamp(expire_timestamp / 1e9)
else:
expire_date = datetime.fromtimestamp(expire_timestamp)
else:
expire_date = pd.to_datetime(expire_timestamp)
else:
# 基于合约代码推断交割日
expire_date = self._infer_delivery_date(symbol)
# 计算距离交割日的天数
today = datetime.now()
days_to_delivery = (expire_date - today).days
# 确定换月预警级别
warning_level = self._calculate_warning_level(days_to_delivery)
return {
'expire_date': expire_date.strftime('%Y-%m-%d'),
'days_to_delivery': days_to_delivery,
'warning_level': warning_level
}
def _infer_delivery_date(self, symbol: str) -> datetime:
"""基于合约代码推断交割日"""
# 简化的合约代码解析
# 假设合约代码格式为:品种+年份+月份,如 'CU2309'
try:
# 提取年份和月份
year_str = symbol[-4:-2]
month_str = symbol[-2:]
# 构建年份(加上世纪)
year = 2000 + int(year_str)
month = int(month_str)
# 假设交割日为合约月份的15日
# 实际交割日可能因品种而异,这里使用简化处理
expire_date = datetime(year, month, 15)
return expire_date
except Exception:
# 如果解析失败返回60天后的日期
return datetime.now() + timedelta(days=60)
def _calculate_warning_level(self, days_to_delivery: int) -> str:
"""计算换月预警级别"""
if days_to_delivery <= 3:
return 'critical'
elif days_to_delivery <= 7:
return 'high'
elif days_to_delivery <= 15:
return 'medium'
elif days_to_delivery <= 30:
return 'low'
else:
return 'none'
def _analyze_liquidity(self, data: pd.DataFrame) -> Dict:
"""分析流动性"""
# 计算成交量指标
avg_volume = data['volume'].tail(20).mean()
volume_trend = self._analyze_volume_trend(data['volume'])
# 计算持仓量指标
avg_open_interest = data['open_interest'].tail(20).mean()
oi_trend = self._analyze_oi_trend(data['open_interest'])
# 计算买卖价差(简化处理)
# 实际应该使用Tick数据计算
bid_ask_spread = self._estimate_bid_ask_spread(data)
# 计算流动性评分
liquidity_score = self._calculate_liquidity_score(avg_volume, volume_trend, avg_open_interest, oi_trend, bid_ask_spread)
# 确定流动性风险级别
liquidity_risk = self._calculate_liquidity_risk(liquidity_score)
return {
'avg_volume': avg_volume,
'volume_trend': volume_trend,
'avg_open_interest': avg_open_interest,
'oi_trend': oi_trend,
'bid_ask_spread': bid_ask_spread,
'liquidity_score': liquidity_score,
'liquidity_risk': liquidity_risk
}
def _analyze_volume_trend(self, volume_series: pd.Series) -> str:
"""分析成交量趋势"""
if len(volume_series) < 10:
return 'stable'
# 计算短期和长期移动平均线
short_ma = volume_series.tail(10).mean()
long_ma = volume_series.tail(30).mean()
if short_ma > long_ma * 1.1:
return 'increasing'
elif short_ma < long_ma * 0.9:
return 'decreasing'
else:
return 'stable'
def _analyze_oi_trend(self, oi_series: pd.Series) -> str:
"""分析持仓量趋势"""
if len(oi_series) < 10:
return 'stable'
# 计算短期和长期移动平均线
short_ma = oi_series.tail(10).mean()
long_ma = oi_series.tail(30).mean()
if short_ma > long_ma * 1.1:
return 'increasing'
elif short_ma < long_ma * 0.9:
return 'decreasing'
else:
return 'stable'
def _estimate_bid_ask_spread(self, data: pd.DataFrame) -> float:
"""估算买卖价差"""
# 简化处理,使用收盘价的波动来估算
price_volatility = data['close'].tail(20).std()
# 假设价差为波动率的10%
return price_volatility * 0.1
def _calculate_liquidity_score(self, avg_volume: float, volume_trend: str,
avg_open_interest: float, oi_trend: str,
bid_ask_spread: float) -> float:
"""计算流动性评分"""
# 基础分数
base_score = 100
# 成交量因素
if avg_volume < 1000:
base_score -= 30
elif avg_volume < 5000:
base_score -= 15
# 成交量趋势因素
if volume_trend == 'decreasing':
base_score -= 20
elif volume_trend == 'increasing':
base_score += 10
# 持仓量因素
if avg_open_interest < 5000:
base_score -= 20
elif avg_open_interest < 20000:
base_score -= 10
# 持仓量趋势因素
if oi_trend == 'decreasing':
base_score -= 15
elif oi_trend == 'increasing':
base_score += 5
# 买卖价差因素
if bid_ask_spread > 0.5:
base_score -= 25
elif bid_ask_spread > 0.2:
base_score -= 10
# 确保分数在0-100之间
return max(0, min(100, base_score))
def _calculate_liquidity_risk(self, liquidity_score: float) -> str:
"""计算流动性风险"""
if liquidity_score < 30:
return 'high'
elif liquidity_score < 60:
return 'medium'
else:
return 'low'
def _analyze_spread(self, symbol: str) -> Dict:
"""分析价差"""
# 简化处理,实际应该比较当前合约和下一个合约的价差
# 这里返回模拟数据
return {
'current_next_spread': 5.2,
'spread_trend': 'stable',
'spread_ratio': 0.0015
}
def _generate_rollover_warning(self, delivery_info: Dict, liquidity_info: Dict) -> Dict:
"""生成换月预警"""
warning_level = delivery_info['warning_level']
liquidity_risk = liquidity_info['liquidity_risk']
# 综合预警
overall_warning = 'none'
if warning_level in ['critical', 'high'] or liquidity_risk == 'high':
overall_warning = 'high'
elif warning_level == 'medium' or liquidity_risk == 'medium':
overall_warning = 'medium'
elif warning_level == 'low':
overall_warning = 'low'
# 预警信息
warning_message = self._generate_warning_message(warning_level, liquidity_risk)
# 建议操作
recommended_actions = self._generate_recommended_actions(warning_level, liquidity_risk)
return {
'overall_warning': overall_warning,
'warning_message': warning_message,
'recommended_actions': recommended_actions
}
def _generate_warning_message(self, warning_level: str, liquidity_risk: str) -> str:
"""生成预警信息"""
messages = []
if warning_level == 'critical':
messages.append('合约即将到期距离交割日不足3天')
elif warning_level == 'high':
messages.append('合约接近到期距离交割日不足7天')
elif warning_level == 'medium':
messages.append('合约距离交割日不足15天建议开始关注换月')
if liquidity_risk == 'high':
messages.append('流动性风险较高,可能影响交易执行')
elif liquidity_risk == 'medium':
messages.append('流动性风险中等,建议谨慎交易')
if not messages:
return '合约状态正常,无需特殊关注'
return '; '.join(messages)
def _generate_recommended_actions(self, warning_level: str, liquidity_risk: str) -> List[str]:
"""生成建议操作"""
actions = []
if warning_level in ['critical', 'high']:
actions.append('立即开始换月操作')
actions.append('逐步减仓当前合约')
actions.append('在新合约建立相应仓位')
elif warning_level == 'medium':
actions.append('开始评估换月时机')
actions.append('关注新合约流动性')
if liquidity_risk == 'high':
actions.append('减小单笔交易规模')
actions.append('使用限价单而非市价单')
actions.append('考虑提前换月')
return actions
def _generate_position_adjustment(self, delivery_info: Dict, liquidity_info: Dict) -> Dict:
"""生成仓位调整建议"""
days_to_delivery = delivery_info['days_to_delivery']
warning_level = delivery_info['warning_level']
liquidity_risk = liquidity_info['liquidity_risk']
# 计算减仓比例
reduction_ratio = self._calculate_reduction_ratio(days_to_delivery, warning_level, liquidity_risk)
# 计算建议的减仓时间表
reduction_schedule = self._generate_reduction_schedule(days_to_delivery, reduction_ratio)
# 计算新合约建仓建议
new_contract_adjustment = self._generate_new_contract_adjustment(reduction_ratio)
return {
'reduction_ratio': reduction_ratio,
'reduction_schedule': reduction_schedule,
'new_contract_adjustment': new_contract_adjustment
}
def _calculate_reduction_ratio(self, days_to_delivery: int, warning_level: str, liquidity_risk: str) -> float:
"""计算减仓比例"""
# 基础减仓比例
base_ratio = 0.0
if warning_level == 'critical':
base_ratio = 0.9 # 减仓90%
elif warning_level == 'high':
base_ratio = 0.7 # 减仓70%
elif warning_level == 'medium':
base_ratio = 0.4 # 减仓40%
elif warning_level == 'low':
base_ratio = 0.2 # 减仓20%
# 流动性风险调整
if liquidity_risk == 'high':
base_ratio = min(1.0, base_ratio + 0.2)
elif liquidity_risk == 'medium':
base_ratio = min(1.0, base_ratio + 0.1)
return base_ratio
def _generate_reduction_schedule(self, days_to_delivery: int, reduction_ratio: float) -> List[Dict]:
"""生成减仓时间表"""
schedule = []
if days_to_delivery <= 3:
# 紧急减仓
schedule.append({
'timeframe': '今日',
'reduction_ratio': reduction_ratio
})
elif days_to_delivery <= 7:
# 快速减仓
daily_ratio = reduction_ratio / 3
for i in range(3):
schedule.append({
'timeframe': f'{i+1}天内',
'reduction_ratio': daily_ratio
})
elif days_to_delivery <= 15:
# 逐步减仓
daily_ratio = reduction_ratio / 5
for i in range(5):
schedule.append({
'timeframe': f'{i+1}天内',
'reduction_ratio': daily_ratio
})
elif days_to_delivery <= 30:
# 缓慢减仓
weekly_ratio = reduction_ratio / 2
schedule.append({
'timeframe': '第一周',
'reduction_ratio': weekly_ratio
})
schedule.append({
'timeframe': '第二周',
'reduction_ratio': weekly_ratio
})
return schedule
def _generate_new_contract_adjustment(self, reduction_ratio: float) -> Dict:
"""生成新合约建仓建议"""
# 建议在新合约建立与原合约相同方向的仓位
# 建仓比例应与减仓比例对应
return {
'direction': 'same_as_current',
'target_ratio': reduction_ratio,
'execution_strategy': 'gradual',
'considerations': [
'关注新合约流动性',
'注意合约间价差',
'避免在换月高峰期交易'
]
}
def monitor_rollover_risk(self, symbol: str, data: pd.DataFrame, position_size: float) -> Dict:
"""监控换月风险"""
# 分析换月情况
rollover_analysis = self.analyze_rollover(symbol, data)
# 计算风险暴露
risk_exposure = self._calculate_risk_exposure(position_size, rollover_analysis)
# 生成风险报告
risk_report = {
'symbol': symbol,
'position_size': position_size,
'rollover_analysis': rollover_analysis,
'risk_exposure': risk_exposure,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return risk_report
def _calculate_risk_exposure(self, position_size: float, rollover_analysis: Dict) -> Dict:
"""计算风险暴露"""
warning_level = rollover_analysis['warning_level']
liquidity_risk = rollover_analysis.get('liquidity_risk', 'low')
# 基础风险分数
base_risk = 0
if warning_level == 'critical':
base_risk = 90
elif warning_level == 'high':
base_risk = 70
elif warning_level == 'medium':
base_risk = 40
elif warning_level == 'low':
base_risk = 20
# 流动性风险调整
if liquidity_risk == 'high':
base_risk += 20
elif liquidity_risk == 'medium':
base_risk += 10
# 仓位大小调整
if position_size > 10:
base_risk += 15
elif position_size > 5:
base_risk += 5
# 确保风险分数在0-100之间
risk_score = max(0, min(100, base_risk))
# 风险等级
risk_level = 'low'
if risk_score >= 80:
risk_level = 'critical'
elif risk_score >= 60:
risk_level = 'high'
elif risk_score >= 30:
risk_level = 'medium'
return {
'risk_score': risk_score,
'risk_level': risk_level,
'recommendations': self._generate_risk_recommendations(risk_level)
}
def _generate_risk_recommendations(self, risk_level: str) -> List[str]:
"""生成风险建议"""
recommendations = []
if risk_level == 'critical':
recommendations.append('立即减仓至最小仓位')
recommendations.append('优先处理换月操作')
recommendations.append('密切监控市场流动性')
elif risk_level == 'high':
recommendations.append('大幅减仓当前合约')
recommendations.append('加速换月进程')
recommendations.append('使用限价单控制交易成本')
elif risk_level == 'medium':
recommendations.append('开始有序减仓')
recommendations.append('评估换月时机')
recommendations.append('关注新合约表现')
else:
recommendations.append('保持正常交易')
recommendations.append('定期监控合约到期情况')
return recommendations