You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
6.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package monitor
import (
"context"
"fmt"
"log"
"time"
"market-data-service/api"
"market-data-service/internal/repository"
)
// Monitor 数据质量监控
type Monitor struct {
db *repository.DB
stockRepo *repository.StockRepository
futuresRepo *repository.FuturesRepository
sender AlertSender
}
// AlertSender 告警发送接口
type AlertSender interface {
SendAlert(title, content string) error
}
// NewMonitor 创建监控器
func NewMonitor(db *repository.DB, stockRepo *repository.StockRepository, futuresRepo *repository.FuturesRepository, sender AlertSender) *Monitor {
return &Monitor{
db: db,
stockRepo: stockRepo,
futuresRepo: futuresRepo,
sender: sender,
}
}
// CheckResult 检查结果
type CheckResult struct {
Symbol string
Freq string
CheckDate string
CheckType string
Status string // pass/fail
ExpectCount int
ActualCount int
Detail string
}
// DailyCheck 每日数据质量检查
func (m *Monitor) DailyCheck(ctx context.Context, checkDate string) error {
log.Printf("Starting daily data quality check for %s", checkDate)
// 检查股票数据
if err := m.checkStockData(ctx, checkDate); err != nil {
log.Printf("Stock data check failed: %v", err)
}
// 检查期货数据
if err := m.checkFuturesData(ctx, checkDate); err != nil {
log.Printf("Futures data check failed: %v", err)
}
log.Printf("Daily data quality check completed")
return nil
}
// checkStockData 检查股票数据质量
func (m *Monitor) checkStockData(ctx context.Context, checkDate string) error {
// 1. 获取所有活跃股票
symbols, _, err := m.stockRepo.ListSymbols(ctx, &api.SymbolListRequest{
Page: 1,
Size: 10000,
})
if err != nil {
return err
}
// 2. 检查1分钟线完整性240条
for _, symbol := range symbols {
result := m.checkKLineCompleteness(ctx, "stock", symbol.SymbolID, "1m", checkDate, 240)
if err := m.saveCheckResult(ctx, "stock", result); err != nil {
log.Printf("Failed to save check result: %v", err)
}
}
return nil
}
// checkFuturesData 检查期货数据质量
func (m *Monitor) checkFuturesData(ctx context.Context, checkDate string) error {
// 获取所有活跃期货合约
symbols, _, err := m.futuresRepo.ListSymbols(ctx, &api.SymbolListRequest{
Page: 1,
Size: 10000,
})
if err != nil {
return err
}
// 检查1分钟线完整性
for _, symbol := range symbols {
result := m.checkKLineCompleteness(ctx, "futures", symbol.SymbolID, "1m", checkDate, 240)
if err := m.saveCheckResult(ctx, "futures", result); err != nil {
log.Printf("Failed to save check result: %v", err)
}
}
return nil
}
// checkKLineCompleteness 检查K线完整性
func (m *Monitor) checkKLineCompleteness(ctx context.Context, assetType, symbol, freq, checkDate string, expectCount int) CheckResult {
result := CheckResult{
Symbol: symbol,
Freq: freq,
CheckDate: checkDate,
CheckType: "missing",
Status: "pass",
}
// 解析日期
start, _ := time.Parse("20060102", checkDate)
end := start.Add(24 * time.Hour).Add(-time.Second)
var actualCount int
var err error
if assetType == "stock" {
items, err := m.stockRepo.GetKLines(ctx, symbol, api.Frequency(freq), start, end, api.AdjustNone)
if err == nil {
actualCount = len(items)
}
} else {
items, err := m.futuresRepo.GetKLines(ctx, symbol, api.Frequency(freq), start, end)
if err == nil {
actualCount = len(items)
}
}
if err != nil {
result.Status = "fail"
result.Detail = fmt.Sprintf("Error querying data: %v", err)
return result
}
result.ExpectCount = expectCount
result.ActualCount = actualCount
// 判断缺失情况
if actualCount < expectCount {
result.Status = "fail"
result.Detail = fmt.Sprintf("Data missing: expected %d, actual %d", expectCount, actualCount)
// 发送告警
if m.sender != nil {
title := fmt.Sprintf("[%s] Data Missing Alert", assetType)
content := fmt.Sprintf("Symbol: %s, Date: %s, Expected: %d, Actual: %d",
symbol, checkDate, expectCount, actualCount)
m.sender.SendAlert(title, content)
}
}
return result
}
// saveCheckResult 保存检查结果
func (m *Monitor) saveCheckResult(ctx context.Context, assetType string, result CheckResult) error {
// 使用Repository保存到data_quality_checks表
query := fmt.Sprintf(`
INSERT INTO %s.data_quality_checks
(check_date, symbol_id, freq, check_type, status, expect_count, actual_count, detail)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (check_date, symbol_id, freq, check_type) DO UPDATE SET
status = EXCLUDED.status,
expect_count = EXCLUDED.expect_count,
actual_count = EXCLUDED.actual_count,
detail = EXCLUDED.detail,
created_at = NOW()
`, assetType)
_, err := m.db.ExecContext(ctx, query,
result.CheckDate, result.Symbol, result.Freq,
result.CheckType, result.Status, result.ExpectCount,
result.ActualCount, result.Detail)
return err
}
// GetQualityReport 获取数据质量报告
func (m *Monitor) GetQualityReport(ctx context.Context, assetType, checkDate string) (*QualityReport, error) {
query := fmt.Sprintf(`
SELECT
COUNT(*) as total_checks,
SUM(CASE WHEN status = 'pass' THEN 1 ELSE 0 END) as pass_count,
SUM(CASE WHEN status = 'fail' THEN 1 ELSE 0 END) as fail_count
FROM %s.data_quality_checks
WHERE check_date = $1
`, assetType)
var report QualityReport
report.CheckDate = checkDate
report.AssetType = assetType
row := m.db.QueryRowContext(ctx, query, checkDate)
err := row.Scan(&report.TotalChecks, &report.PassCount, &report.FailCount)
if err != nil {
return nil, err
}
if report.TotalChecks > 0 {
report.PassRate = float64(report.PassCount) / float64(report.TotalChecks) * 100
}
return &report, nil
}
// QualityReport 数据质量报告
type QualityReport struct {
AssetType string `json:"asset_type"`
CheckDate string `json:"check_date"`
TotalChecks int `json:"total_checks"`
PassCount int `json:"pass_count"`
FailCount int `json:"fail_count"`
PassRate float64 `json:"pass_rate"`
}
// LogAlertSender 日志告警发送器
type LogAlertSender struct{}
// SendAlert 发送告警到日志
func (s *LogAlertSender) SendAlert(title, content string) error {
log.Printf("[ALERT] %s: %s", title, content)
return nil
}
// StartDailyCheckCron 启动每日检查定时任务
func (m *Monitor) StartDailyCheckCron(ctx context.Context) {
go func() {
for {
// 计算到下一个盘后的时间假设15:30
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day(), 15, 35, 0, 0, now.Location())
if next.Before(now) {
next = next.Add(24 * time.Hour)
}
timer := time.NewTimer(next.Sub(now))
<-timer.C
// 执行检查
checkDate := now.Format("20060102")
if err := m.DailyCheck(ctx, checkDate); err != nil {
log.Printf("Daily check failed: %v", err)
}
}
}()
}