# backend/app/services/quality_monitor.py """ 数据质量监控服务 支持完整性、准确性、及时性、一致性四维监控,问题发现<1分钟 """ import asyncio from typing import Dict, List, Optional from datetime import datetime, timedelta from enum import Enum from sqlalchemy.orm import Session from sqlalchemy import func, and_ from app.models.kline import Kline from app.services.cache_service import cache_service from app.config import settings import logging logger = logging.getLogger(__name__) class QualityMetric(str, Enum): """质量指标""" COMPLETENESS = "completeness" # 完整性 ACCURACY = "accuracy" # 准确性 TIMELINESS = "timeliness" # 及时性 CONSISTENCY = "consistency" # 一致性 class QualityLevel(str, Enum): """告警级别""" INFO = "info" # 信息 WARNING = "warning" # 警告 CRITICAL = "critical" # 严重 class QualityMonitor: """ 数据质量监控服务 功能: - 完整性检测(数据缺失) - 准确性检测(价格异常) - 及时性检测(数据延迟) - 一致性检测(缓存 vs 数据库) - 质量评分计算 - 告警触发 性能优化: - 定时检测(每分钟) - 批量检测 - 结果缓存 """ def __init__(self): # 质量评分缓存 (symbol -> scores) self.quality_scores: Dict[str, Dict[str, float]] = {} # 问题列表 self.quality_issues: List[dict] = [] # 检测统计 self.total_checks = 0 self.total_issues = 0 async def check_completeness(self, db: Session, symbol: str, period: str = "1m") -> float: """ 检查完整性 Args: db: 数据库会话 symbol: 品种代码 period: 周期 Returns: float: 完整性评分(0-100) """ try: # 获取应到数据量(最近 7 天) start_time = datetime.now() - timedelta(days=7) # 计算预期数量(根据周期) if period == "1m": # 每分钟一条,交易时间每天约 4 小时 = 240 条/天 expected_per_day = 240 elif period == "5m": expected_per_day = 48 elif period == "15m": expected_per_day = 16 elif period == "30m": expected_per_day = 8 elif period == "60m": expected_per_day = 4 elif period == "1d": expected_per_day = 1 elif period == "1w": expected_per_day = 0.14 # 每周一条 else: expected_per_day = 240 # 交易天数(假设每周 5 天) trading_days = 5 expected_count = expected_per_day * trading_days # 获取实际数量 actual_count = db.query(func.count(Kline.id)).filter( and_( Kline.symbol == symbol, Kline.period == period, Kline.time >= start_time ) ).scalar() # 计算完整性评分 if expected_count == 0: return 100.0 completeness = (actual_count / expected_count) * 100 completeness = min(completeness, 100.0) # 上限 100 # 记录问题 if completeness < 80: await self._record_issue( symbol, QualityMetric.COMPLETENESS, completeness, 80, QualityLevel.WARNING, f"数据完整性不足:预期 {expected_count} 条,实际 {actual_count} 条" ) return completeness except Exception as e: logger.error(f"❌ 完整性检测失败: {e}") return 0.0 async def check_accuracy(self, db: Session, symbol: str) -> float: """ 检查准确性 Args: db: 数据库会话 symbol: 品种代码 Returns: float: 准确性评分(0-100) """ try: # 获取最近 7 天数据 start_time = datetime.now() - timedelta(days=7) # 检测价格异常(涨跌幅 > 10%) abnormal_count = db.query(func.count(Kline.id)).filter( and_( Kline.symbol == symbol, abs(Kline.change_percent) > 10, Kline.time >= start_time ) ).scalar() # 获取总数 total_count = db.query(func.count(Kline.id)).filter( and_( Kline.symbol == symbol, Kline.time >= start_time ) ).scalar() if total_count == 0: return 100.0 # 计算准确性评分 accuracy = (1 - abnormal_count / total_count) * 100 accuracy = max(accuracy, 0.0) # 下限 0 # 记录问题 if accuracy < 95: await self._record_issue( symbol, QualityMetric.ACCURACY, accuracy, 95, QualityLevel.WARNING, f"数据准确性不足:发现 {abnormal_count} 条异常数据" ) return accuracy except Exception as e: logger.error(f"❌ 准确性检测失败: {e}") return 0.0 async def check_timeliness(self, db: Session, symbol: str, period: str = "1m") -> float: """ 检查及时性 Args: db: 数据库会话 symbol: 品种代码 period: 周期 Returns: float: 及时性评分(0-100) """ try: # 获取最新数据时间 latest = db.query(Kline).filter( and_( Kline.symbol == symbol, Kline.period == period ) ).order_by(Kline.time.desc()).first() if not latest: await self._record_issue( symbol, QualityMetric.TIMELINESS, 0, 80, QualityLevel.CRITICAL, f"无 {period} 周期数据" ) return 0.0 # 计算延迟 delay = (datetime.now() - latest.time).total_seconds() # 预期延迟(根据周期) if period == "1m": expected_delay = 60 # 1 分钟 elif period == "5m": expected_delay = 300 # 5 分钟 elif period == "15m": expected_delay = 900 # 15 分钟 elif period == "30m": expected_delay = 1800 # 30 分钟 elif period == "60m": expected_delay = 3600 # 1 小时 elif period == "1d": expected_delay = 86400 # 1 天 else: expected_delay = 60 # 计算及时性评分 if delay <= expected_delay: timeliness = 100.0 else: # 延迟越久,评分越低 excess_delay = delay - expected_delay timeliness = max(0, 100 - excess_delay / 60) # 每分钟扣 1 分 # 记录问题 if timeliness < 80: level = QualityLevel.WARNING if timeliness > 50 else QualityLevel.CRITICAL await self._record_issue( symbol, QualityMetric.TIMELINESS, timeliness, 80, level, f"数据延迟:最新数据时间为 {latest.time.strftime('%Y-%m-%d %H:%M:%S')}, 延迟 {delay:.0f} 秒" ) return timeliness except Exception as e: logger.error(f"❌ 及时性检测失败: {e}") return 0.0 async def check_consistency(self, db: Session, symbol: str) -> float: """ 检查一致性(缓存 vs 数据库) Args: db: 数据库会话 symbol: 品种代码 Returns: float: 一致性评分(0-100) """ try: # 从缓存获取最新行情 cache_data = await cache_service.get_latest_quote(symbol) # 从数据库获取最新行情(简化:从 Kline 表) latest = db.query(Kline).filter( Kline.symbol == symbol ).order_by(Kline.time.desc()).first() if not cache_data or not latest: return 100.0 # 缺少数据时默认一致 # 比较价格 cache_price = float(cache_data.get("price", 0)) db_price = float(latest.close) # 允许微小误差(0.01%) if abs(cache_price - db_price) / max(db_price, 1) < 0.0001: return 100.0 # 不一致 await self._record_issue( symbol, QualityMetric.CONSISTENCY, 0, 100, QualityLevel.WARNING, f"缓存与数据库不一致:缓存价格 {cache_price}, 数据库价格 {db_price}" ) return 0.0 except Exception as e: logger.error(f"❌ 一致性检测失败: {e}") return 0.0 async def calculate_overall_score(self, db: Session, symbol: str) -> Dict[str, float]: """ 计算总体评分 Args: db: 数据库会话 symbol: 品种代码 Returns: Dict[str, float]: 各项评分 """ # 计算各项指标 completeness = await self.check_completeness(db, symbol, "1m") accuracy = await self.check_accuracy(db, symbol) timeliness = await self.check_timeliness(db, symbol, "1m") consistency = await self.check_consistency(db, symbol) # 总体评分(平均) overall = (completeness + accuracy + timeliness + consistency) / 4 # 缓存结果 self.quality_scores[symbol] = { "completeness": completeness, "accuracy": accuracy, "timeliness": timeliness, "consistency": consistency, "overall": overall, "time": datetime.now().isoformat() } return self.quality_scores[symbol] async def check_all_symbols(self, db: Session) -> Dict[str, Dict[str, float]]: """ 检查所有品种 Args: db: 数据库会话 Returns: Dict[str, Dict[str, float]]: 各品种评分 """ # 获取所有品种 symbols = db.query(Kline.symbol).distinct().all() symbols = [s[0] for s in symbols] results = {} for symbol in symbols: scores = await self.calculate_overall_score(db, symbol) results[symbol] = scores self.total_checks += 1 return results async def _record_issue( self, symbol: str, metric: QualityMetric, value: float, threshold: float, level: QualityLevel, message: str ): """ 记录质量问题 Args: symbol: 品种代码 metric: 质量指标 value: 实际值 threshold: 阈值 level: 告警级别 message: 详细信息 """ issue = { "symbol": symbol, "metric": metric.value, "value": value, "threshold": threshold, "level": level.value, "message": message, "time": datetime.now().isoformat() } self.quality_issues.append(issue) self.total_issues += 1 logger.warning(f"⚠️ 数据质量问题: {message}") def get_issues(self, limit: int = 100) -> List[dict]: """ 获取问题列表 Args: limit: 最大数量 Returns: List[dict]: 问题列表 """ return self.quality_issues[-limit:] def get_statistics(self) -> dict: """获取统计信息""" return { "total_checks": self.total_checks, "total_issues": self.total_issues, "cached_symbols": len(self.quality_scores), } # 全局质量监控实例 quality_monitor = QualityMonitor() # ============== 定时任务 ============== async def quality_check_task(db: Session): """ 数据质量检查定时任务 每分钟执行一次 """ logger.info("🔄 QualityMonitor 开始检查...") # 检查所有品种 results = await quality_monitor.check_all_symbols(db) # 统计问题数量 total_issues = len(quality_monitor.quality_issues) logger.info(f"📊 QualityMonitor 检查完成: 检查 {len(results)} 个品种, 发现 {total_issues} 个问题") return results