You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

516 lines
17 KiB

# backend/app/api/v2/alert.py
"""
告警 API 接口
支持告警规则的 CRUD 操作
"""
from typing import Optional, List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.db.base import get_db
from app.middleware.auth import get_current_user
from app.models.user import User
from app.models.alert import (
AlertRule, AlertHistory,
AlertRuleCreate, AlertRuleUpdate, AlertRuleResponse,
AlertHistoryResponse, AlertListResponse, AlertHistoryListResponse,
AlertType, AlertOperator, NotifyChannel
)
from app.services.alert_engine import alert_engine
from app.services.alert_notification import alert_notification
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v2/alert", tags=["告警服务"])
@router.post("/rules", response_model=AlertRuleResponse, summary="创建告警规则")
async def create_alert_rule(
rule_data: AlertRuleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
创建告警规则
- **name**: 告警名称
- **symbol**: 品种代码 IF2406
- **type**: 告警类型price/change_percent/technical/volume
- **condition**: 触发条件JSON 结构
- **channels**: 通知渠道列表
"""
try:
# 创建规则
rule = AlertRule(
user_id=current_user.id,
name=rule_data.name,
symbol=rule_data.symbol,
type=rule_data.type.value if hasattr(rule_data.type, 'value') else rule_data.type,
condition=rule_data.condition.model_dump(),
channels=[c.value if hasattr(c, 'value') else c for c in rule_data.channels],
enabled=rule_data.enabled,
start_time=rule_data.start_time,
end_time=rule_data.end_time,
repeat_interval=rule_data.repeat_interval
)
db.add(rule)
db.commit()
db.refresh(rule)
# 加载到缓存
await alert_engine.load_user_rules(db, current_user.id)
logger.info(f"✅ 用户 {current_user.id} 创建告警规则 {rule.id}: {rule.name}")
return AlertRuleResponse(
id=rule.id,
user_id=rule.user_id,
name=rule.name,
symbol=rule.symbol,
type=rule.type,
condition=rule.condition,
channels=rule.channels,
enabled=rule.enabled,
start_time=str(rule.start_time) if rule.start_time else None,
end_time=str(rule.end_time) if rule.end_time else None,
repeat_interval=rule.repeat_interval,
last_triggered_at=rule.last_triggered_at,
trigger_count=rule.trigger_count,
created_at=rule.created_at,
updated_at=rule.updated_at
)
except Exception as e:
logger.error(f"❌ 创建告警规则失败: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=f"创建告警规则失败: {str(e)}")
@router.get("/rules", response_model=AlertListResponse, summary="查询告警规则列表")
async def get_alert_rules(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
symbol: Optional[str] = Query(None, description="品种代码筛选"),
type: Optional[AlertType] = Query(None, description="告警类型筛选"),
enabled: Optional[bool] = Query(None, description="是否启用筛选"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
查询告警规则列表
支持按品种类型状态筛选
"""
try:
# 构造查询
query = db.query(AlertRule).filter(AlertRule.user_id == current_user.id)
if symbol:
query = query.filter(AlertRule.symbol == symbol)
if type:
query = query.filter(AlertRule.type == type.value)
if enabled is not None:
query = query.filter(AlertRule.enabled == enabled)
# 总数
total = query.count()
# 分页
rules = query.order_by(AlertRule.created_at.desc()).offset((page - 1) * page_size).limit(page_size).all()
# 构造响应
items = [
AlertRuleResponse(
id=rule.id,
user_id=rule.user_id,
name=rule.name,
symbol=rule.symbol,
type=rule.type,
condition=rule.condition,
channels=rule.channels,
enabled=rule.enabled,
start_time=str(rule.start_time) if rule.start_time else None,
end_time=str(rule.end_time) if rule.end_time else None,
repeat_interval=rule.repeat_interval,
last_triggered_at=rule.last_triggered_at,
trigger_count=rule.trigger_count,
created_at=rule.created_at,
updated_at=rule.updated_at
)
for rule in rules
]
return AlertListResponse(
total=total,
page=page,
page_size=page_size,
items=items
)
except Exception as e:
logger.error(f"❌ 查询告警规则列表失败: {e}")
raise HTTPException(status_code=500, detail=f"查询告警规则列表失败: {str(e)}")
@router.get("/rules/{rule_id}", response_model=AlertRuleResponse, summary="查询告警规则详情")
async def get_alert_rule(
rule_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
查询告警规则详情
"""
try:
rule = db.query(AlertRule).filter(
AlertRule.id == rule_id,
AlertRule.user_id == current_user.id
).first()
if not rule:
raise HTTPException(status_code=404, detail="告警规则不存在")
return AlertRuleResponse(
id=rule.id,
user_id=rule.user_id,
name=rule.name,
symbol=rule.symbol,
type=rule.type,
condition=rule.condition,
channels=rule.channels,
enabled=rule.enabled,
start_time=str(rule.start_time) if rule.start_time else None,
end_time=str(rule.end_time) if rule.end_time else None,
repeat_interval=rule.repeat_interval,
last_triggered_at=rule.last_triggered_at,
trigger_count=rule.trigger_count,
created_at=rule.created_at,
updated_at=rule.updated_at
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 查询告警规则详情失败: {e}")
raise HTTPException(status_code=500, detail=f"查询告警规则详情失败: {str(e)}")
@router.put("/rules/{rule_id}", response_model=AlertRuleResponse, summary="更新告警规则")
async def update_alert_rule(
rule_id: int,
rule_data: AlertRuleUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
更新告警规则
"""
try:
rule = db.query(AlertRule).filter(
AlertRule.id == rule_id,
AlertRule.user_id == current_user.id
).first()
if not rule:
raise HTTPException(status_code=404, detail="告警规则不存在")
# 更新字段
if rule_data.name is not None:
rule.name = rule_data.name
if rule_data.symbol is not None:
rule.symbol = rule_data.symbol
if rule_data.type is not None:
rule.type = rule_data.type.value if hasattr(rule_data.type, 'value') else rule_data.type
if rule_data.condition is not None:
rule.condition = rule_data.condition.model_dump()
if rule_data.channels is not None:
rule.channels = [c.value if hasattr(c, 'value') else c for c in rule_data.channels]
if rule_data.enabled is not None:
rule.enabled = rule_data.enabled
if rule_data.start_time is not None:
rule.start_time = rule_data.start_time
if rule_data.end_time is not None:
rule.end_time = rule_data.end_time
if rule_data.repeat_interval is not None:
rule.repeat_interval = rule_data.repeat_interval
rule.updated_at = datetime.now()
db.commit()
db.refresh(rule)
# 更新缓存
await alert_engine.load_user_rules(db, current_user.id)
logger.info(f"✅ 用户 {current_user.id} 更新告警规则 {rule_id}")
return AlertRuleResponse(
id=rule.id,
user_id=rule.user_id,
name=rule.name,
symbol=rule.symbol,
type=rule.type,
condition=rule.condition,
channels=rule.channels,
enabled=rule.enabled,
start_time=str(rule.start_time) if rule.start_time else None,
end_time=str(rule.end_time) if rule.end_time else None,
repeat_interval=rule.repeat_interval,
last_triggered_at=rule.last_triggered_at,
trigger_count=rule.trigger_count,
created_at=rule.created_at,
updated_at=rule.updated_at
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 更新告警规则失败: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=f"更新告警规则失败: {str(e)}")
@router.delete("/rules/{rule_id}", summary="删除告警规则")
async def delete_alert_rule(
rule_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
删除告警规则
"""
try:
rule = db.query(AlertRule).filter(
AlertRule.id == rule_id,
AlertRule.user_id == current_user.id
).first()
if not rule:
raise HTTPException(status_code=404, detail="告警规则不存在")
db.delete(rule)
db.commit()
# 更新缓存
await alert_engine.load_user_rules(db, current_user.id)
logger.info(f"✅ 用户 {current_user.id} 删除告警规则 {rule_id}")
return {"status": "success", "message": "告警规则已删除"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 删除告警规则失败: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=f"删除告警规则失败: {str(e)}")
@router.post("/rules/{rule_id}/enable", summary="启用告警规则")
async def enable_alert_rule(
rule_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
启用告警规则
"""
try:
rule = db.query(AlertRule).filter(
AlertRule.id == rule_id,
AlertRule.user_id == current_user.id
).first()
if not rule:
raise HTTPException(status_code=404, detail="告警规则不存在")
rule.enabled = True
rule.updated_at = datetime.now()
db.commit()
# 更新缓存
await alert_engine.load_user_rules(db, current_user.id)
logger.info(f"✅ 用户 {current_user.id} 启用告警规则 {rule_id}")
return {"status": "success", "message": "告警规则已启用"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 启用告警规则失败: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=f"启用告警规则失败: {str(e)}")
@router.post("/rules/{rule_id}/disable", summary="禁用告警规则")
async def disable_alert_rule(
rule_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
禁用告警规则
"""
try:
rule = db.query(AlertRule).filter(
AlertRule.id == rule_id,
AlertRule.user_id == current_user.id
).first()
if not rule:
raise HTTPException(status_code=404, detail="告警规则不存在")
rule.enabled = False
rule.updated_at = datetime.now()
db.commit()
# 更新缓存
await alert_engine.load_user_rules(db, current_user.id)
logger.info(f"✅ 用户 {current_user.id} 禁用告警规则 {rule_id}")
return {"status": "success", "message": "告警规则已禁用"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 禁用告警规则失败: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=f"禁用告警规则失败: {str(e)}")
@router.get("/history", response_model=AlertHistoryListResponse, summary="查询告警历史")
async def get_alert_history(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
rule_id: Optional[int] = Query(None, description="规则 ID 筛选"),
symbol: Optional[str] = Query(None, description="品种代码筛选"),
start_date: Optional[datetime] = Query(None, description="开始日期"),
end_date: Optional[datetime] = Query(None, description="结束日期"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
查询告警历史
支持按规则品种时间范围筛选
"""
try:
# 构造查询
query = db.query(AlertHistory).filter(AlertHistory.user_id == current_user.id)
if rule_id:
query = query.filter(AlertHistory.rule_id == rule_id)
if symbol:
query = query.filter(AlertHistory.symbol == symbol)
if start_date:
query = query.filter(AlertHistory.trigger_time >= start_date)
if end_date:
query = query.filter(AlertHistory.trigger_time <= end_date)
# 总数
total = query.count()
# 分页
histories = query.order_by(AlertHistory.trigger_time.desc()).offset((page - 1) * page_size).limit(page_size).all()
# 构造响应
items = [
AlertHistoryResponse(
id=h.id,
rule_id=h.rule_id,
user_id=h.user_id,
symbol=h.symbol,
trigger_value=float(h.trigger_value) if h.trigger_value else None,
trigger_condition=h.trigger_condition,
notified=h.notified,
notify_channels=h.notify_channels,
notify_time=h.notify_time,
trigger_time=h.trigger_time,
created_at=h.created_at
)
for h in histories
]
return AlertHistoryListResponse(
total=total,
page=page,
page_size=page_size,
items=items
)
except Exception as e:
logger.error(f"❌ 查询告警历史失败: {e}")
raise HTTPException(status_code=500, detail=f"查询告警历史失败: {str(e)}")
@router.get("/statistics", summary="查询告警统计")
async def get_alert_statistics(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
查询告警统计
包括规则数量触发次数通知统计等
"""
try:
# 规则统计
total_rules = db.query(AlertRule).filter(AlertRule.user_id == current_user.id).count()
enabled_rules = db.query(AlertRule).filter(
AlertRule.user_id == current_user.id,
AlertRule.enabled == True
).count()
# 历史统计
total_triggers = db.query(AlertHistory).filter(AlertHistory.user_id == current_user.id).count()
# 今日触发
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_triggers = db.query(AlertHistory).filter(
AlertHistory.user_id == current_user.id,
AlertHistory.trigger_time >= today_start
).count()
# 引擎统计
engine_stats = alert_engine.get_statistics()
# 通知统计
notification_stats = alert_notification.get_statistics()
return {
"rules": {
"total": total_rules,
"enabled": enabled_rules,
"disabled": total_rules - enabled_rules
},
"history": {
"total_triggers": total_triggers,
"today_triggers": today_triggers
},
"engine": engine_stats,
"notification": notification_stats
}
except Exception as e:
logger.error(f"❌ 查询告警统计失败: {e}")
raise HTTPException(status_code=500, detail=f"查询告警统计失败: {str(e)}")