You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
344 lines
11 KiB
344 lines
11 KiB
# backend/app/api/v2/quality.py
|
|
"""
|
|
数据质量监控 API 接口
|
|
支持质量评分查询、问题列表、监控规则管理
|
|
"""
|
|
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from app.db.base import get_db
|
|
from app.middleware.auth import get_current_user
|
|
from app.models.user import User
|
|
from app.services.quality_monitor import quality_monitor, QualityMetric, QualityLevel
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/v2/quality", tags=["质量监控"])
|
|
|
|
|
|
@router.get("/score", summary="查询质量评分")
|
|
async def get_quality_score(
|
|
symbol: Optional[str] = Query(None, description="品种代码,为空则查询所有品种"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
查询数据质量评分
|
|
|
|
返回完整性、准确性、及时性、一致性四维评分
|
|
"""
|
|
try:
|
|
if symbol:
|
|
# 查询单个品种
|
|
scores = await quality_monitor.calculate_overall_score(db, symbol)
|
|
return {
|
|
"symbol": symbol,
|
|
"scores": scores
|
|
}
|
|
else:
|
|
# 查询所有品种
|
|
results = await quality_monitor.check_all_symbols(db)
|
|
return {
|
|
"total_symbols": len(results),
|
|
"scores": results,
|
|
"statistics": quality_monitor.get_statistics()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 查询质量评分失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"查询质量评分失败: {str(e)}")
|
|
|
|
|
|
@router.get("/issues", summary="查询问题列表")
|
|
async def get_quality_issues(
|
|
page: int = Query(1, ge=1, description="页码"),
|
|
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
|
|
symbol: Optional[str] = Query(None, description="品种代码筛选"),
|
|
metric: Optional[QualityMetric] = Query(None, description="质量指标筛选"),
|
|
level: Optional[QualityLevel] = Query(None, description="告警级别筛选"),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
查询数据质量问题列表
|
|
|
|
支持按品种、指标、级别筛选
|
|
"""
|
|
try:
|
|
# 获取问题列表
|
|
all_issues = quality_monitor.get_issues(1000)
|
|
|
|
# 筛选
|
|
filtered_issues = all_issues
|
|
|
|
if symbol:
|
|
filtered_issues = [i for i in filtered_issues if i.get("symbol") == symbol]
|
|
|
|
if metric:
|
|
filtered_issues = [i for i in filtered_issues if i.get("metric") == metric.value]
|
|
|
|
if level:
|
|
filtered_issues = [i for i in filtered_issues if i.get("level") == level.value]
|
|
|
|
# 总数
|
|
total = len(filtered_issues)
|
|
|
|
# 分页
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
page_issues = filtered_issues[start:end]
|
|
|
|
return {
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"items": page_issues
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 查询问题列表失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"查询问题列表失败: {str(e)}")
|
|
|
|
|
|
@router.get("/history", summary="查询监控历史")
|
|
async def get_quality_history(
|
|
symbol: Optional[str] = Query(None, description="品种代码"),
|
|
metric: Optional[QualityMetric] = Query(None, description="质量指标"),
|
|
days: int = Query(7, ge=1, le=30, description="查询天数"),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
查询质量监控历史数据
|
|
|
|
用于绘制趋势图表
|
|
"""
|
|
try:
|
|
# TODO: 从数据库查询历史记录
|
|
# 这里暂时返回模拟数据
|
|
|
|
history = []
|
|
for i in range(days):
|
|
date = datetime.now() - timedelta(days=i)
|
|
history.append({
|
|
"date": date.strftime("%Y-%m-%d"),
|
|
"completeness": 98.5 + random.uniform(-2, 2),
|
|
"accuracy": 99.8 + random.uniform(-1, 1),
|
|
"timeliness": 97.2 + random.uniform(-3, 3),
|
|
"consistency": 100.0,
|
|
"overall": 98.9 + random.uniform(-2, 2)
|
|
})
|
|
|
|
return {
|
|
"symbol": symbol or "ALL",
|
|
"metric": metric.value if metric else "overall",
|
|
"days": days,
|
|
"history": history
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 查询监控历史失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"查询监控历史失败: {str(e)}")
|
|
|
|
|
|
@router.get("/statistics", summary="查询监控统计")
|
|
async def get_quality_statistics(
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
查询质量监控统计
|
|
|
|
包括检查次数、问题数量、各级别问题统计等
|
|
"""
|
|
try:
|
|
stats = quality_monitor.get_statistics()
|
|
|
|
# 统计各级别问题数量
|
|
issues = quality_monitor.get_issues(1000)
|
|
level_counts = {
|
|
"info": len([i for i in issues if i.get("level") == "info"]),
|
|
"warning": len([i for i in issues if i.get("level") == "warning"]),
|
|
"critical": len([i for i in issues if i.get("level") == "critical"])
|
|
}
|
|
|
|
# 统计各指标问题数量
|
|
metric_counts = {}
|
|
for metric in QualityMetric:
|
|
metric_counts[metric.value] = len([i for i in issues if i.get("metric") == metric.value])
|
|
|
|
return {
|
|
"monitor_stats": stats,
|
|
"level_counts": level_counts,
|
|
"metric_counts": metric_counts,
|
|
"total_issues": len(issues)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 查询监控统计失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"查询监控统计失败: {str(e)}")
|
|
|
|
|
|
@router.post("/check", summary="触发质量检查")
|
|
async def trigger_quality_check(
|
|
symbol: Optional[str] = Query(None, description="品种代码,为空则检查所有品种"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
手动触发质量检查
|
|
|
|
管理员可手动触发检查
|
|
"""
|
|
try:
|
|
if symbol:
|
|
# 检查单个品种
|
|
scores = await quality_monitor.calculate_overall_score(db, symbol)
|
|
return {
|
|
"symbol": symbol,
|
|
"scores": scores,
|
|
"issues": [i for i in quality_monitor.get_issues(100) if i.get("symbol") == symbol]
|
|
}
|
|
else:
|
|
# 检查所有品种
|
|
results = await quality_monitor.check_all_symbols(db)
|
|
return {
|
|
"total_symbols": len(results),
|
|
"scores": results,
|
|
"issues": quality_monitor.get_issues(100)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 触发质量检查失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"触发质量检查失败: {str(e)}")
|
|
|
|
|
|
# ============== 监控规则管理 ==============
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class QualityRuleCreate(BaseModel):
|
|
"""创建质量规则"""
|
|
name: str = Field(..., description="规则名称")
|
|
symbol: Optional[str] = Field(None, description="品种代码,为空表示全局规则")
|
|
metric: QualityMetric = Field(..., description="监控指标")
|
|
condition: str = Field(..., description="条件表达式")
|
|
threshold: float = Field(..., description="阈值")
|
|
level: QualityLevel = Field(default=QualityLevel.WARNING, description="告警级别")
|
|
description: Optional[str] = Field(None, description="规则描述")
|
|
|
|
|
|
class QualityRuleUpdate(BaseModel):
|
|
"""更新质量规则"""
|
|
name: Optional[str] = None
|
|
symbol: Optional[str] = None
|
|
metric: Optional[QualityMetric] = None
|
|
condition: Optional[str] = None
|
|
threshold: Optional[float] = None
|
|
level: Optional[QualityLevel] = None
|
|
description: Optional[str] = None
|
|
enabled: Optional[bool] = None
|
|
|
|
|
|
@router.post("/rules", summary="创建监控规则")
|
|
async def create_quality_rule(
|
|
rule_data: QualityRuleCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
创建数据质量监控规则
|
|
"""
|
|
try:
|
|
# TODO: 实现规则创建
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "监控规则创建成功",
|
|
"rule": rule_data.model_dump()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 创建监控规则失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"创建监控规则失败: {str(e)}")
|
|
|
|
|
|
@router.get("/rules", summary="查询监控规则列表")
|
|
async def get_quality_rules(
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(20, ge=1, le=100),
|
|
metric: Optional[QualityMetric] = Query(None),
|
|
enabled: Optional[bool] = Query(None),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
查询监控规则列表
|
|
"""
|
|
try:
|
|
# TODO: 实现规则查询
|
|
|
|
return {
|
|
"total": 0,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"items": []
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 查询监控规则列表失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"查询监控规则列表失败: {str(e)}")
|
|
|
|
|
|
@router.put("/rules/{rule_id}", summary="更新监控规则")
|
|
async def update_quality_rule(
|
|
rule_id: int,
|
|
rule_data: QualityRuleUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
更新监控规则
|
|
"""
|
|
try:
|
|
# TODO: 实现规则更新
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "监控规则更新成功",
|
|
"rule_id": rule_id
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 更新监控规则失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"更新监控规则失败: {str(e)}")
|
|
|
|
|
|
@router.delete("/rules/{rule_id}", summary="删除监控规则")
|
|
async def delete_quality_rule(
|
|
rule_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
删除监控规则
|
|
"""
|
|
try:
|
|
# TODO: 实现规则删除
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "监控规则删除成功",
|
|
"rule_id": rule_id
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 删除监控规则失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"删除监控规则失败: {str(e)}")
|
|
|
|
|
|
# ============== 导入依赖 ==============
|
|
|
|
from datetime import timedelta
|
|
import random |