|
|
# backend/app/services/quality_monitor.py
|
|
|
"""
|
|
|
数据质量监控服务
|
|
|
支持完整性、准确性、及时性、一致性四维监控,问题发现<1分钟
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
from typing import Dict, List, Optional
|
|
|
from datetime import datetime, timedelta
|
|
|
from enum import Enum
|
|
|
from sqlalchemy.orm import Session
|
|
|
from sqlalchemy import func, and_
|
|
|
from app.models.kline import Kline
|
|
|
from app.services.cache_service import cache_service
|
|
|
from app.config import settings
|
|
|
import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class QualityMetric(str, Enum):
|
|
|
"""质量指标"""
|
|
|
COMPLETENESS = "completeness" # 完整性
|
|
|
ACCURACY = "accuracy" # 准确性
|
|
|
TIMELINESS = "timeliness" # 及时性
|
|
|
CONSISTENCY = "consistency" # 一致性
|
|
|
|
|
|
|
|
|
class QualityLevel(str, Enum):
|
|
|
"""告警级别"""
|
|
|
INFO = "info" # 信息
|
|
|
WARNING = "warning" # 警告
|
|
|
CRITICAL = "critical" # 严重
|
|
|
|
|
|
|
|
|
class QualityMonitor:
|
|
|
"""
|
|
|
数据质量监控服务
|
|
|
|
|
|
功能:
|
|
|
- 完整性检测(数据缺失)
|
|
|
- 准确性检测(价格异常)
|
|
|
- 及时性检测(数据延迟)
|
|
|
- 一致性检测(缓存 vs 数据库)
|
|
|
- 质量评分计算
|
|
|
- 告警触发
|
|
|
|
|
|
性能优化:
|
|
|
- 定时检测(每分钟)
|
|
|
- 批量检测
|
|
|
- 结果缓存
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
# 质量评分缓存 (symbol -> scores)
|
|
|
self.quality_scores: Dict[str, Dict[str, float]] = {}
|
|
|
|
|
|
# 问题列表
|
|
|
self.quality_issues: List[dict] = []
|
|
|
|
|
|
# 检测统计
|
|
|
self.total_checks = 0
|
|
|
self.total_issues = 0
|
|
|
|
|
|
async def check_completeness(self, db: Session, symbol: str, period: str = "1m") -> float:
|
|
|
"""
|
|
|
检查完整性
|
|
|
|
|
|
Args:
|
|
|
db: 数据库会话
|
|
|
symbol: 品种代码
|
|
|
period: 周期
|
|
|
|
|
|
Returns:
|
|
|
float: 完整性评分(0-100)
|
|
|
"""
|
|
|
try:
|
|
|
# 获取应到数据量(最近 7 天)
|
|
|
start_time = datetime.now() - timedelta(days=7)
|
|
|
|
|
|
# 计算预期数量(根据周期)
|
|
|
if period == "1m":
|
|
|
# 每分钟一条,交易时间每天约 4 小时 = 240 条/天
|
|
|
expected_per_day = 240
|
|
|
elif period == "5m":
|
|
|
expected_per_day = 48
|
|
|
elif period == "15m":
|
|
|
expected_per_day = 16
|
|
|
elif period == "30m":
|
|
|
expected_per_day = 8
|
|
|
elif period == "60m":
|
|
|
expected_per_day = 4
|
|
|
elif period == "1d":
|
|
|
expected_per_day = 1
|
|
|
elif period == "1w":
|
|
|
expected_per_day = 0.14 # 每周一条
|
|
|
else:
|
|
|
expected_per_day = 240
|
|
|
|
|
|
# 交易天数(假设每周 5 天)
|
|
|
trading_days = 5
|
|
|
expected_count = expected_per_day * trading_days
|
|
|
|
|
|
# 获取实际数量
|
|
|
actual_count = db.query(func.count(Kline.id)).filter(
|
|
|
and_(
|
|
|
Kline.symbol == symbol,
|
|
|
Kline.period == period,
|
|
|
Kline.time >= start_time
|
|
|
)
|
|
|
).scalar()
|
|
|
|
|
|
# 计算完整性评分
|
|
|
if expected_count == 0:
|
|
|
return 100.0
|
|
|
|
|
|
completeness = (actual_count / expected_count) * 100
|
|
|
completeness = min(completeness, 100.0) # 上限 100
|
|
|
|
|
|
# 记录问题
|
|
|
if completeness < 80:
|
|
|
await self._record_issue(
|
|
|
symbol, QualityMetric.COMPLETENESS,
|
|
|
completeness, 80,
|
|
|
QualityLevel.WARNING,
|
|
|
f"数据完整性不足:预期 {expected_count} 条,实际 {actual_count} 条"
|
|
|
)
|
|
|
|
|
|
return completeness
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 完整性检测失败: {e}")
|
|
|
return 0.0
|
|
|
|
|
|
async def check_accuracy(self, db: Session, symbol: str) -> float:
|
|
|
"""
|
|
|
检查准确性
|
|
|
|
|
|
Args:
|
|
|
db: 数据库会话
|
|
|
symbol: 品种代码
|
|
|
|
|
|
Returns:
|
|
|
float: 准确性评分(0-100)
|
|
|
"""
|
|
|
try:
|
|
|
# 获取最近 7 天数据
|
|
|
start_time = datetime.now() - timedelta(days=7)
|
|
|
|
|
|
# 检测价格异常(涨跌幅 > 10%)
|
|
|
abnormal_count = db.query(func.count(Kline.id)).filter(
|
|
|
and_(
|
|
|
Kline.symbol == symbol,
|
|
|
abs(Kline.change_percent) > 10,
|
|
|
Kline.time >= start_time
|
|
|
)
|
|
|
).scalar()
|
|
|
|
|
|
# 获取总数
|
|
|
total_count = db.query(func.count(Kline.id)).filter(
|
|
|
and_(
|
|
|
Kline.symbol == symbol,
|
|
|
Kline.time >= start_time
|
|
|
)
|
|
|
).scalar()
|
|
|
|
|
|
if total_count == 0:
|
|
|
return 100.0
|
|
|
|
|
|
# 计算准确性评分
|
|
|
accuracy = (1 - abnormal_count / total_count) * 100
|
|
|
accuracy = max(accuracy, 0.0) # 下限 0
|
|
|
|
|
|
# 记录问题
|
|
|
if accuracy < 95:
|
|
|
await self._record_issue(
|
|
|
symbol, QualityMetric.ACCURACY,
|
|
|
accuracy, 95,
|
|
|
QualityLevel.WARNING,
|
|
|
f"数据准确性不足:发现 {abnormal_count} 条异常数据"
|
|
|
)
|
|
|
|
|
|
return accuracy
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 准确性检测失败: {e}")
|
|
|
return 0.0
|
|
|
|
|
|
async def check_timeliness(self, db: Session, symbol: str, period: str = "1m") -> float:
|
|
|
"""
|
|
|
检查及时性
|
|
|
|
|
|
Args:
|
|
|
db: 数据库会话
|
|
|
symbol: 品种代码
|
|
|
period: 周期
|
|
|
|
|
|
Returns:
|
|
|
float: 及时性评分(0-100)
|
|
|
"""
|
|
|
try:
|
|
|
# 获取最新数据时间
|
|
|
latest = db.query(Kline).filter(
|
|
|
and_(
|
|
|
Kline.symbol == symbol,
|
|
|
Kline.period == period
|
|
|
)
|
|
|
).order_by(Kline.time.desc()).first()
|
|
|
|
|
|
if not latest:
|
|
|
await self._record_issue(
|
|
|
symbol, QualityMetric.TIMELINESS,
|
|
|
0, 80,
|
|
|
QualityLevel.CRITICAL,
|
|
|
f"无 {period} 周期数据"
|
|
|
)
|
|
|
return 0.0
|
|
|
|
|
|
# 计算延迟
|
|
|
delay = (datetime.now() - latest.time).total_seconds()
|
|
|
|
|
|
# 预期延迟(根据周期)
|
|
|
if period == "1m":
|
|
|
expected_delay = 60 # 1 分钟
|
|
|
elif period == "5m":
|
|
|
expected_delay = 300 # 5 分钟
|
|
|
elif period == "15m":
|
|
|
expected_delay = 900 # 15 分钟
|
|
|
elif period == "30m":
|
|
|
expected_delay = 1800 # 30 分钟
|
|
|
elif period == "60m":
|
|
|
expected_delay = 3600 # 1 小时
|
|
|
elif period == "1d":
|
|
|
expected_delay = 86400 # 1 天
|
|
|
else:
|
|
|
expected_delay = 60
|
|
|
|
|
|
# 计算及时性评分
|
|
|
if delay <= expected_delay:
|
|
|
timeliness = 100.0
|
|
|
else:
|
|
|
# 延迟越久,评分越低
|
|
|
excess_delay = delay - expected_delay
|
|
|
timeliness = max(0, 100 - excess_delay / 60) # 每分钟扣 1 分
|
|
|
|
|
|
# 记录问题
|
|
|
if timeliness < 80:
|
|
|
level = QualityLevel.WARNING if timeliness > 50 else QualityLevel.CRITICAL
|
|
|
await self._record_issue(
|
|
|
symbol, QualityMetric.TIMELINESS,
|
|
|
timeliness, 80,
|
|
|
level,
|
|
|
f"数据延迟:最新数据时间为 {latest.time.strftime('%Y-%m-%d %H:%M:%S')}, 延迟 {delay:.0f} 秒"
|
|
|
)
|
|
|
|
|
|
return timeliness
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 及时性检测失败: {e}")
|
|
|
return 0.0
|
|
|
|
|
|
async def check_consistency(self, db: Session, symbol: str) -> float:
|
|
|
"""
|
|
|
检查一致性(缓存 vs 数据库)
|
|
|
|
|
|
Args:
|
|
|
db: 数据库会话
|
|
|
symbol: 品种代码
|
|
|
|
|
|
Returns:
|
|
|
float: 一致性评分(0-100)
|
|
|
"""
|
|
|
try:
|
|
|
# 从缓存获取最新行情
|
|
|
cache_data = await cache_service.get_latest_quote(symbol)
|
|
|
|
|
|
# 从数据库获取最新行情(简化:从 Kline 表)
|
|
|
latest = db.query(Kline).filter(
|
|
|
Kline.symbol == symbol
|
|
|
).order_by(Kline.time.desc()).first()
|
|
|
|
|
|
if not cache_data or not latest:
|
|
|
return 100.0 # 缺少数据时默认一致
|
|
|
|
|
|
# 比较价格
|
|
|
cache_price = float(cache_data.get("price", 0))
|
|
|
db_price = float(latest.close)
|
|
|
|
|
|
# 允许微小误差(0.01%)
|
|
|
if abs(cache_price - db_price) / max(db_price, 1) < 0.0001:
|
|
|
return 100.0
|
|
|
|
|
|
# 不一致
|
|
|
await self._record_issue(
|
|
|
symbol, QualityMetric.CONSISTENCY,
|
|
|
0, 100,
|
|
|
QualityLevel.WARNING,
|
|
|
f"缓存与数据库不一致:缓存价格 {cache_price}, 数据库价格 {db_price}"
|
|
|
)
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 一致性检测失败: {e}")
|
|
|
return 0.0
|
|
|
|
|
|
async def calculate_overall_score(self, db: Session, symbol: str) -> Dict[str, float]:
|
|
|
"""
|
|
|
计算总体评分
|
|
|
|
|
|
Args:
|
|
|
db: 数据库会话
|
|
|
symbol: 品种代码
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, float]: 各项评分
|
|
|
"""
|
|
|
# 计算各项指标
|
|
|
completeness = await self.check_completeness(db, symbol, "1m")
|
|
|
accuracy = await self.check_accuracy(db, symbol)
|
|
|
timeliness = await self.check_timeliness(db, symbol, "1m")
|
|
|
consistency = await self.check_consistency(db, symbol)
|
|
|
|
|
|
# 总体评分(平均)
|
|
|
overall = (completeness + accuracy + timeliness + consistency) / 4
|
|
|
|
|
|
# 缓存结果
|
|
|
self.quality_scores[symbol] = {
|
|
|
"completeness": completeness,
|
|
|
"accuracy": accuracy,
|
|
|
"timeliness": timeliness,
|
|
|
"consistency": consistency,
|
|
|
"overall": overall,
|
|
|
"time": datetime.now().isoformat()
|
|
|
}
|
|
|
|
|
|
return self.quality_scores[symbol]
|
|
|
|
|
|
async def check_all_symbols(self, db: Session) -> Dict[str, Dict[str, float]]:
|
|
|
"""
|
|
|
检查所有品种
|
|
|
|
|
|
Args:
|
|
|
db: 数据库会话
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Dict[str, float]]: 各品种评分
|
|
|
"""
|
|
|
# 获取所有品种
|
|
|
symbols = db.query(Kline.symbol).distinct().all()
|
|
|
symbols = [s[0] for s in symbols]
|
|
|
|
|
|
results = {}
|
|
|
for symbol in symbols:
|
|
|
scores = await self.calculate_overall_score(db, symbol)
|
|
|
results[symbol] = scores
|
|
|
self.total_checks += 1
|
|
|
|
|
|
return results
|
|
|
|
|
|
async def _record_issue(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
metric: QualityMetric,
|
|
|
value: float,
|
|
|
threshold: float,
|
|
|
level: QualityLevel,
|
|
|
message: str
|
|
|
):
|
|
|
"""
|
|
|
记录质量问题
|
|
|
|
|
|
Args:
|
|
|
symbol: 品种代码
|
|
|
metric: 质量指标
|
|
|
value: 实际值
|
|
|
threshold: 阈值
|
|
|
level: 告警级别
|
|
|
message: 详细信息
|
|
|
"""
|
|
|
issue = {
|
|
|
"symbol": symbol,
|
|
|
"metric": metric.value,
|
|
|
"value": value,
|
|
|
"threshold": threshold,
|
|
|
"level": level.value,
|
|
|
"message": message,
|
|
|
"time": datetime.now().isoformat()
|
|
|
}
|
|
|
|
|
|
self.quality_issues.append(issue)
|
|
|
self.total_issues += 1
|
|
|
|
|
|
logger.warning(f"⚠️ 数据质量问题: {message}")
|
|
|
|
|
|
def get_issues(self, limit: int = 100) -> List[dict]:
|
|
|
"""
|
|
|
获取问题列表
|
|
|
|
|
|
Args:
|
|
|
limit: 最大数量
|
|
|
|
|
|
Returns:
|
|
|
List[dict]: 问题列表
|
|
|
"""
|
|
|
return self.quality_issues[-limit:]
|
|
|
|
|
|
def get_statistics(self) -> dict:
|
|
|
"""获取统计信息"""
|
|
|
return {
|
|
|
"total_checks": self.total_checks,
|
|
|
"total_issues": self.total_issues,
|
|
|
"cached_symbols": len(self.quality_scores),
|
|
|
}
|
|
|
|
|
|
|
|
|
# 全局质量监控实例
|
|
|
quality_monitor = QualityMonitor()
|
|
|
|
|
|
|
|
|
# ============== 定时任务 ==============
|
|
|
|
|
|
async def quality_check_task(db: Session):
|
|
|
"""
|
|
|
数据质量检查定时任务
|
|
|
|
|
|
每分钟执行一次
|
|
|
"""
|
|
|
logger.info("🔄 QualityMonitor 开始检查...")
|
|
|
|
|
|
# 检查所有品种
|
|
|
results = await quality_monitor.check_all_symbols(db)
|
|
|
|
|
|
# 统计问题数量
|
|
|
total_issues = len(quality_monitor.quality_issues)
|
|
|
|
|
|
logger.info(f"📊 QualityMonitor 检查完成: 检查 {len(results)} 个品种, 发现 {total_issues} 个问题")
|
|
|
|
|
|
return results |