You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
2.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package service
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"market-data-service/api"
"market-data-service/internal/repository"
)
// AdminServiceImpl 管理服务实现
type AdminServiceImpl struct {
db *repository.DB
}
// NewAdminService 创建管理服务
func NewAdminService(db *repository.DB) AdminService {
return &AdminServiceImpl{
db: db,
}
}
// GetDataSourceStatus 获取数据源状态
func (s *AdminServiceImpl) GetDataSourceStatus(ctx context.Context) (*api.DataSourceStatusData, error) {
// 查询数据库中的数据源配置
query := `
SELECT asset_class, active_source, standby_sources, updated_at
FROM public.data_source_config
`
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
data := &api.DataSourceStatusData{}
for rows.Next() {
var assetClass, activeSource string
var standbySources []string
var updatedAt time.Time
if err := rows.Scan(&assetClass, &activeSource, &standbySources, &updatedAt); err != nil {
return nil, err
}
info := api.DataSourceInfo{
ActiveSource: activeSource,
StandbySources: standbySources,
Status: api.DataSourceHealthy,
}
switch assetClass {
case "stock":
data.Stock = info
case "futures":
data.Futures = info
}
}
return data, rows.Err()
}
// SwitchDataSource 切换数据源
func (s *AdminServiceImpl) SwitchDataSource(ctx context.Context, req *api.SourceSwitchRequest) error {
// 更新数据源配置
query := `
UPDATE public.data_source_config
SET active_source = $1, updated_at = NOW()
WHERE asset_class = $2
`
assetClasses := []string{}
if req.AssetClass == api.AssetAll {
assetClasses = []string{"stock", "futures"}
} else {
assetClasses = []string{string(req.AssetClass)}
}
for _, ac := range assetClasses {
_, err := s.db.ExecContext(ctx, query, req.Source, ac)
if err != nil {
return fmt.Errorf("failed to switch %s data source: %w", ac, err)
}
}
// 如果需要同步补录,启动后台任务
if req.SyncBackfill {
// TODO: 启动异步补录任务
go func() {
// 异步执行补录
}()
}
return nil
}
// BackfillData 历史数据补录
func (s *AdminServiceImpl) BackfillData(ctx context.Context, req *api.BackfillRequest) (string, error) {
taskID := uuid.New().String()
// TODO: 将补录任务存入数据库启动后台Worker执行
// 这里简化处理直接返回任务ID
return taskID, nil
}
// HealthCheck 健康检查
func (s *AdminServiceImpl) HealthCheck(ctx context.Context) (*api.HealthResponse, error) {
// 检查数据库连接
if err := s.db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("database connection failed: %w", err)
}
return &api.HealthResponse{
Status: "healthy",
Timestamp: time.Now(),
}, nil
}