You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
11 KiB

# backend/app/api/v2/quality.py
"""
数据质量监控 API 接口
支持质量评分查询问题列表监控规则管理
"""
from typing import Optional, List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.db.base import get_db
from app.middleware.auth import get_current_user
from app.models.user import User
from app.services.quality_monitor import quality_monitor, QualityMetric, QualityLevel
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v2/quality", tags=["质量监控"])
@router.get("/score", summary="查询质量评分")
async def get_quality_score(
symbol: Optional[str] = Query(None, description="品种代码,为空则查询所有品种"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
查询数据质量评分
返回完整性准确性及时性一致性四维评分
"""
try:
if symbol:
# 查询单个品种
scores = await quality_monitor.calculate_overall_score(db, symbol)
return {
"symbol": symbol,
"scores": scores
}
else:
# 查询所有品种
results = await quality_monitor.check_all_symbols(db)
return {
"total_symbols": len(results),
"scores": results,
"statistics": quality_monitor.get_statistics()
}
except Exception as e:
logger.error(f"❌ 查询质量评分失败: {e}")
raise HTTPException(status_code=500, detail=f"查询质量评分失败: {str(e)}")
@router.get("/issues", summary="查询问题列表")
async def get_quality_issues(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
symbol: Optional[str] = Query(None, description="品种代码筛选"),
metric: Optional[QualityMetric] = Query(None, description="质量指标筛选"),
level: Optional[QualityLevel] = Query(None, description="告警级别筛选"),
current_user: User = Depends(get_current_user)
):
"""
查询数据质量问题列表
支持按品种指标级别筛选
"""
try:
# 获取问题列表
all_issues = quality_monitor.get_issues(1000)
# 筛选
filtered_issues = all_issues
if symbol:
filtered_issues = [i for i in filtered_issues if i.get("symbol") == symbol]
if metric:
filtered_issues = [i for i in filtered_issues if i.get("metric") == metric.value]
if level:
filtered_issues = [i for i in filtered_issues if i.get("level") == level.value]
# 总数
total = len(filtered_issues)
# 分页
start = (page - 1) * page_size
end = start + page_size
page_issues = filtered_issues[start:end]
return {
"total": total,
"page": page,
"page_size": page_size,
"items": page_issues
}
except Exception as e:
logger.error(f"❌ 查询问题列表失败: {e}")
raise HTTPException(status_code=500, detail=f"查询问题列表失败: {str(e)}")
@router.get("/history", summary="查询监控历史")
async def get_quality_history(
symbol: Optional[str] = Query(None, description="品种代码"),
metric: Optional[QualityMetric] = Query(None, description="质量指标"),
days: int = Query(7, ge=1, le=30, description="查询天数"),
current_user: User = Depends(get_current_user)
):
"""
查询质量监控历史数据
用于绘制趋势图表
"""
try:
# TODO: 从数据库查询历史记录
# 这里暂时返回模拟数据
history = []
for i in range(days):
date = datetime.now() - timedelta(days=i)
history.append({
"date": date.strftime("%Y-%m-%d"),
"completeness": 98.5 + random.uniform(-2, 2),
"accuracy": 99.8 + random.uniform(-1, 1),
"timeliness": 97.2 + random.uniform(-3, 3),
"consistency": 100.0,
"overall": 98.9 + random.uniform(-2, 2)
})
return {
"symbol": symbol or "ALL",
"metric": metric.value if metric else "overall",
"days": days,
"history": history
}
except Exception as e:
logger.error(f"❌ 查询监控历史失败: {e}")
raise HTTPException(status_code=500, detail=f"查询监控历史失败: {str(e)}")
@router.get("/statistics", summary="查询监控统计")
async def get_quality_statistics(
current_user: User = Depends(get_current_user)
):
"""
查询质量监控统计
包括检查次数问题数量各级别问题统计等
"""
try:
stats = quality_monitor.get_statistics()
# 统计各级别问题数量
issues = quality_monitor.get_issues(1000)
level_counts = {
"info": len([i for i in issues if i.get("level") == "info"]),
"warning": len([i for i in issues if i.get("level") == "warning"]),
"critical": len([i for i in issues if i.get("level") == "critical"])
}
# 统计各指标问题数量
metric_counts = {}
for metric in QualityMetric:
metric_counts[metric.value] = len([i for i in issues if i.get("metric") == metric.value])
return {
"monitor_stats": stats,
"level_counts": level_counts,
"metric_counts": metric_counts,
"total_issues": len(issues)
}
except Exception as e:
logger.error(f"❌ 查询监控统计失败: {e}")
raise HTTPException(status_code=500, detail=f"查询监控统计失败: {str(e)}")
@router.post("/check", summary="触发质量检查")
async def trigger_quality_check(
symbol: Optional[str] = Query(None, description="品种代码,为空则检查所有品种"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
手动触发质量检查
管理员可手动触发检查
"""
try:
if symbol:
# 检查单个品种
scores = await quality_monitor.calculate_overall_score(db, symbol)
return {
"symbol": symbol,
"scores": scores,
"issues": [i for i in quality_monitor.get_issues(100) if i.get("symbol") == symbol]
}
else:
# 检查所有品种
results = await quality_monitor.check_all_symbols(db)
return {
"total_symbols": len(results),
"scores": results,
"issues": quality_monitor.get_issues(100)
}
except Exception as e:
logger.error(f"❌ 触发质量检查失败: {e}")
raise HTTPException(status_code=500, detail=f"触发质量检查失败: {str(e)}")
# ============== 监控规则管理 ==============
from pydantic import BaseModel, Field
class QualityRuleCreate(BaseModel):
"""创建质量规则"""
name: str = Field(..., description="规则名称")
symbol: Optional[str] = Field(None, description="品种代码,为空表示全局规则")
metric: QualityMetric = Field(..., description="监控指标")
condition: str = Field(..., description="条件表达式")
threshold: float = Field(..., description="阈值")
level: QualityLevel = Field(default=QualityLevel.WARNING, description="告警级别")
description: Optional[str] = Field(None, description="规则描述")
class QualityRuleUpdate(BaseModel):
"""更新质量规则"""
name: Optional[str] = None
symbol: Optional[str] = None
metric: Optional[QualityMetric] = None
condition: Optional[str] = None
threshold: Optional[float] = None
level: Optional[QualityLevel] = None
description: Optional[str] = None
enabled: Optional[bool] = None
@router.post("/rules", summary="创建监控规则")
async def create_quality_rule(
rule_data: QualityRuleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
创建数据质量监控规则
"""
try:
# TODO: 实现规则创建
return {
"status": "success",
"message": "监控规则创建成功",
"rule": rule_data.model_dump()
}
except Exception as e:
logger.error(f"❌ 创建监控规则失败: {e}")
raise HTTPException(status_code=500, detail=f"创建监控规则失败: {str(e)}")
@router.get("/rules", summary="查询监控规则列表")
async def get_quality_rules(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
metric: Optional[QualityMetric] = Query(None),
enabled: Optional[bool] = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
查询监控规则列表
"""
try:
# TODO: 实现规则查询
return {
"total": 0,
"page": page,
"page_size": page_size,
"items": []
}
except Exception as e:
logger.error(f"❌ 查询监控规则列表失败: {e}")
raise HTTPException(status_code=500, detail=f"查询监控规则列表失败: {str(e)}")
@router.put("/rules/{rule_id}", summary="更新监控规则")
async def update_quality_rule(
rule_id: int,
rule_data: QualityRuleUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
更新监控规则
"""
try:
# TODO: 实现规则更新
return {
"status": "success",
"message": "监控规则更新成功",
"rule_id": rule_id
}
except Exception as e:
logger.error(f"❌ 更新监控规则失败: {e}")
raise HTTPException(status_code=500, detail=f"更新监控规则失败: {str(e)}")
@router.delete("/rules/{rule_id}", summary="删除监控规则")
async def delete_quality_rule(
rule_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
删除监控规则
"""
try:
# TODO: 实现规则删除
return {
"status": "success",
"message": "监控规则删除成功",
"rule_id": rule_id
}
except Exception as e:
logger.error(f"❌ 删除监控规则失败: {e}")
raise HTTPException(status_code=500, detail=f"删除监控规则失败: {str(e)}")
# ============== 导入依赖 ==============
from datetime import timedelta
import random