You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
13 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# backend/app/services/quality_monitor.py
"""
数据质量监控服务
支持完整性、准确性、及时性、一致性四维监控,问题发现<1分钟
"""
import asyncio
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from app.models.kline import Kline
from app.services.cache_service import cache_service
from app.config import settings
import logging
logger = logging.getLogger(__name__)
class QualityMetric(str, Enum):
"""质量指标"""
COMPLETENESS = "completeness" # 完整性
ACCURACY = "accuracy" # 准确性
TIMELINESS = "timeliness" # 及时性
CONSISTENCY = "consistency" # 一致性
class QualityLevel(str, Enum):
"""告警级别"""
INFO = "info" # 信息
WARNING = "warning" # 警告
CRITICAL = "critical" # 严重
class QualityMonitor:
"""
数据质量监控服务
功能:
- 完整性检测(数据缺失)
- 准确性检测(价格异常)
- 及时性检测(数据延迟)
- 一致性检测(缓存 vs 数据库)
- 质量评分计算
- 告警触发
性能优化:
- 定时检测(每分钟)
- 批量检测
- 结果缓存
"""
def __init__(self):
# 质量评分缓存 (symbol -> scores)
self.quality_scores: Dict[str, Dict[str, float]] = {}
# 问题列表
self.quality_issues: List[dict] = []
# 检测统计
self.total_checks = 0
self.total_issues = 0
async def check_completeness(self, db: Session, symbol: str, period: str = "1m") -> float:
"""
检查完整性
Args:
db: 数据库会话
symbol: 品种代码
period: 周期
Returns:
float: 完整性评分0-100
"""
try:
# 获取应到数据量(最近 7 天)
start_time = datetime.now() - timedelta(days=7)
# 计算预期数量(根据周期)
if period == "1m":
# 每分钟一条,交易时间每天约 4 小时 = 240 条/天
expected_per_day = 240
elif period == "5m":
expected_per_day = 48
elif period == "15m":
expected_per_day = 16
elif period == "30m":
expected_per_day = 8
elif period == "60m":
expected_per_day = 4
elif period == "1d":
expected_per_day = 1
elif period == "1w":
expected_per_day = 0.14 # 每周一条
else:
expected_per_day = 240
# 交易天数(假设每周 5 天)
trading_days = 5
expected_count = expected_per_day * trading_days
# 获取实际数量
actual_count = db.query(func.count(Kline.id)).filter(
and_(
Kline.symbol == symbol,
Kline.period == period,
Kline.time >= start_time
)
).scalar()
# 计算完整性评分
if expected_count == 0:
return 100.0
completeness = (actual_count / expected_count) * 100
completeness = min(completeness, 100.0) # 上限 100
# 记录问题
if completeness < 80:
await self._record_issue(
symbol, QualityMetric.COMPLETENESS,
completeness, 80,
QualityLevel.WARNING,
f"数据完整性不足:预期 {expected_count} 条,实际 {actual_count}"
)
return completeness
except Exception as e:
logger.error(f"❌ 完整性检测失败: {e}")
return 0.0
async def check_accuracy(self, db: Session, symbol: str) -> float:
"""
检查准确性
Args:
db: 数据库会话
symbol: 品种代码
Returns:
float: 准确性评分0-100
"""
try:
# 获取最近 7 天数据
start_time = datetime.now() - timedelta(days=7)
# 检测价格异常(涨跌幅 > 10%
abnormal_count = db.query(func.count(Kline.id)).filter(
and_(
Kline.symbol == symbol,
abs(Kline.change_percent) > 10,
Kline.time >= start_time
)
).scalar()
# 获取总数
total_count = db.query(func.count(Kline.id)).filter(
and_(
Kline.symbol == symbol,
Kline.time >= start_time
)
).scalar()
if total_count == 0:
return 100.0
# 计算准确性评分
accuracy = (1 - abnormal_count / total_count) * 100
accuracy = max(accuracy, 0.0) # 下限 0
# 记录问题
if accuracy < 95:
await self._record_issue(
symbol, QualityMetric.ACCURACY,
accuracy, 95,
QualityLevel.WARNING,
f"数据准确性不足:发现 {abnormal_count} 条异常数据"
)
return accuracy
except Exception as e:
logger.error(f"❌ 准确性检测失败: {e}")
return 0.0
async def check_timeliness(self, db: Session, symbol: str, period: str = "1m") -> float:
"""
检查及时性
Args:
db: 数据库会话
symbol: 品种代码
period: 周期
Returns:
float: 及时性评分0-100
"""
try:
# 获取最新数据时间
latest = db.query(Kline).filter(
and_(
Kline.symbol == symbol,
Kline.period == period
)
).order_by(Kline.time.desc()).first()
if not latest:
await self._record_issue(
symbol, QualityMetric.TIMELINESS,
0, 80,
QualityLevel.CRITICAL,
f"{period} 周期数据"
)
return 0.0
# 计算延迟
delay = (datetime.now() - latest.time).total_seconds()
# 预期延迟(根据周期)
if period == "1m":
expected_delay = 60 # 1 分钟
elif period == "5m":
expected_delay = 300 # 5 分钟
elif period == "15m":
expected_delay = 900 # 15 分钟
elif period == "30m":
expected_delay = 1800 # 30 分钟
elif period == "60m":
expected_delay = 3600 # 1 小时
elif period == "1d":
expected_delay = 86400 # 1 天
else:
expected_delay = 60
# 计算及时性评分
if delay <= expected_delay:
timeliness = 100.0
else:
# 延迟越久,评分越低
excess_delay = delay - expected_delay
timeliness = max(0, 100 - excess_delay / 60) # 每分钟扣 1 分
# 记录问题
if timeliness < 80:
level = QualityLevel.WARNING if timeliness > 50 else QualityLevel.CRITICAL
await self._record_issue(
symbol, QualityMetric.TIMELINESS,
timeliness, 80,
level,
f"数据延迟:最新数据时间为 {latest.time.strftime('%Y-%m-%d %H:%M:%S')}, 延迟 {delay:.0f}"
)
return timeliness
except Exception as e:
logger.error(f"❌ 及时性检测失败: {e}")
return 0.0
async def check_consistency(self, db: Session, symbol: str) -> float:
"""
检查一致性(缓存 vs 数据库)
Args:
db: 数据库会话
symbol: 品种代码
Returns:
float: 一致性评分0-100
"""
try:
# 从缓存获取最新行情
cache_data = await cache_service.get_latest_quote(symbol)
# 从数据库获取最新行情(简化:从 Kline 表)
latest = db.query(Kline).filter(
Kline.symbol == symbol
).order_by(Kline.time.desc()).first()
if not cache_data or not latest:
return 100.0 # 缺少数据时默认一致
# 比较价格
cache_price = float(cache_data.get("price", 0))
db_price = float(latest.close)
# 允许微小误差0.01%
if abs(cache_price - db_price) / max(db_price, 1) < 0.0001:
return 100.0
# 不一致
await self._record_issue(
symbol, QualityMetric.CONSISTENCY,
0, 100,
QualityLevel.WARNING,
f"缓存与数据库不一致:缓存价格 {cache_price}, 数据库价格 {db_price}"
)
return 0.0
except Exception as e:
logger.error(f"❌ 一致性检测失败: {e}")
return 0.0
async def calculate_overall_score(self, db: Session, symbol: str) -> Dict[str, float]:
"""
计算总体评分
Args:
db: 数据库会话
symbol: 品种代码
Returns:
Dict[str, float]: 各项评分
"""
# 计算各项指标
completeness = await self.check_completeness(db, symbol, "1m")
accuracy = await self.check_accuracy(db, symbol)
timeliness = await self.check_timeliness(db, symbol, "1m")
consistency = await self.check_consistency(db, symbol)
# 总体评分(平均)
overall = (completeness + accuracy + timeliness + consistency) / 4
# 缓存结果
self.quality_scores[symbol] = {
"completeness": completeness,
"accuracy": accuracy,
"timeliness": timeliness,
"consistency": consistency,
"overall": overall,
"time": datetime.now().isoformat()
}
return self.quality_scores[symbol]
async def check_all_symbols(self, db: Session) -> Dict[str, Dict[str, float]]:
"""
检查所有品种
Args:
db: 数据库会话
Returns:
Dict[str, Dict[str, float]]: 各品种评分
"""
# 获取所有品种
symbols = db.query(Kline.symbol).distinct().all()
symbols = [s[0] for s in symbols]
results = {}
for symbol in symbols:
scores = await self.calculate_overall_score(db, symbol)
results[symbol] = scores
self.total_checks += 1
return results
async def _record_issue(
self,
symbol: str,
metric: QualityMetric,
value: float,
threshold: float,
level: QualityLevel,
message: str
):
"""
记录质量问题
Args:
symbol: 品种代码
metric: 质量指标
value: 实际值
threshold: 阈值
level: 告警级别
message: 详细信息
"""
issue = {
"symbol": symbol,
"metric": metric.value,
"value": value,
"threshold": threshold,
"level": level.value,
"message": message,
"time": datetime.now().isoformat()
}
self.quality_issues.append(issue)
self.total_issues += 1
logger.warning(f"⚠️ 数据质量问题: {message}")
def get_issues(self, limit: int = 100) -> List[dict]:
"""
获取问题列表
Args:
limit: 最大数量
Returns:
List[dict]: 问题列表
"""
return self.quality_issues[-limit:]
def get_statistics(self) -> dict:
"""获取统计信息"""
return {
"total_checks": self.total_checks,
"total_issues": self.total_issues,
"cached_symbols": len(self.quality_scores),
}
# 全局质量监控实例
quality_monitor = QualityMonitor()
# ============== 定时任务 ==============
async def quality_check_task(db: Session):
"""
数据质量检查定时任务
每分钟执行一次
"""
logger.info("🔄 QualityMonitor 开始检查...")
# 检查所有品种
results = await quality_monitor.check_all_symbols(db)
# 统计问题数量
total_issues = len(quality_monitor.quality_issues)
logger.info(f"📊 QualityMonitor 检查完成: 检查 {len(results)} 个品种, 发现 {total_issues} 个问题")
return results