You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
5.2 KiB

package service
import (
"context"
"fmt"
"time"
"market-data-service/adapter"
"market-data-service/api"
"market-data-service/internal/repository"
)
// FuturesServiceImpl 期货服务实现
type FuturesServiceImpl struct {
repo *repository.FuturesRepository
adapter adapter.DataSourceAdapter
}
// NewFuturesService 创建期货服务
func NewFuturesService(repo *repository.FuturesRepository, adapter adapter.DataSourceAdapter) FuturesService {
return &FuturesServiceImpl{
repo: repo,
adapter: adapter,
}
}
// QueryKLines 查询K线数据
// 实现回源机制:先查数据库,如数据缺失则从数据源获取并保存
func (s *FuturesServiceImpl) QueryKLines(ctx context.Context, req *api.KLineQueryRequest) (*api.KLineData, error) {
// 解析日期
start, err := time.Parse("20060102", req.Start)
if err != nil {
return nil, fmt.Errorf("invalid start date: %w", err)
}
end, err := time.Parse("20060102", req.End)
if err != nil {
return nil, fmt.Errorf("invalid end date: %w", err)
}
end = end.Add(24 * time.Hour).Add(-time.Second)
// 1. 先尝试从数据库获取数据
items, err := s.repo.GetKLines(ctx, req.Symbol, req.Freq, start, end)
if err != nil {
return nil, err
}
// 2. 判断数据是否完整
shouldFetchFromSource := len(items) == 0 || s.isDataIncomplete(req.Start, req.End, req.Freq, len(items))
// 3. 如果数据不完整且有配置适配器,则从数据源获取
if shouldFetchFromSource && s.adapter != nil {
fetchedItems, err := s.fetchFromSourceAndSave(ctx, req, start, end)
if err == nil && len(fetchedItems) > 0 {
items = fetchedItems
}
}
return &api.KLineData{
Symbol: req.Symbol,
Freq: req.Freq,
Count: len(items),
Items: items,
}, nil
}
// isDataIncomplete 判断数据是否不完整
func (s *FuturesServiceImpl) isDataIncomplete(start, end string, freq api.Frequency, count int) bool {
startDate, _ := time.Parse("20060102", start)
endDate, _ := time.Parse("20060102", end)
expectedDays := int(endDate.Sub(startDate).Hours()/24) + 1
switch freq {
case api.Freq1Day:
// 期货日线:预期交易日数量约为自然日数量的 60%(扣除周末和节假日)
expectedTradingDays := int(float64(expectedDays) * 0.6)
return count < expectedTradingDays
default:
// 分钟线:不判断完整性
return false
}
}
// fetchFromSourceAndSave 从数据源获取数据并保存到数据库
func (s *FuturesServiceImpl) fetchFromSourceAndSave(ctx context.Context, req *api.KLineQueryRequest, start, end time.Time) ([]api.KLineItem, error) {
// 从数据源获取
adapterData, err := s.adapter.FetchKLines(req.Symbol, req.Start, req.End, string(req.Freq))
if err != nil {
return nil, fmt.Errorf("fetch from source failed: %w", err)
}
if len(adapterData) == 0 {
return nil, nil
}
// 转换为 repository 格式
saveItems := make([]api.KLineItem, len(adapterData))
resultItems := make([]api.KLineItem, len(adapterData))
for i, d := range adapterData {
item := api.KLineItem{
Time: time.Unix(d.Time, 0),
Open: d.Open,
High: d.High,
Low: d.Low,
Close: d.Close,
Volume: d.Volume,
Amount: d.Amount,
OpenInterest: &d.OpenInterest,
}
saveItems[i] = item
resultItems[i] = item
}
// 异步保存到数据库(不阻塞响应)
go func() {
saveCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := s.repo.SaveKLines(saveCtx, req.Freq, req.Symbol, saveItems); err != nil {
// 记录错误但不影响返回结果
fmt.Printf("save klines to db failed: %v\n", err)
}
}()
return resultItems, nil
}
// ListSymbols 查询标的列表
func (s *FuturesServiceImpl) ListSymbols(ctx context.Context, req *api.SymbolListRequest) (*api.SymbolListData, error) {
symbols, total, err := s.repo.ListSymbols(ctx, req)
if err != nil {
return nil, err
}
return &api.SymbolListData{
Total: total,
Page: req.Page,
Size: req.Size,
Items: symbols,
}, nil
}
// BatchQueryKLines 批量查询K线
func (s *FuturesServiceImpl) BatchQueryKLines(ctx context.Context, req *api.BatchKLineRequest) (*api.BatchKLineData, error) {
results := make([]api.BatchKLineResult, len(req.Symbols))
for i, symbol := range req.Symbols {
singleReq := &api.KLineQueryRequest{
Symbol: symbol,
Start: req.Start,
End: req.End,
Freq: req.Freq,
}
data, err := s.QueryKLines(ctx, singleReq)
results[i] = api.BatchKLineResult{
Symbol: symbol,
Success: err == nil,
}
if err != nil {
results[i].Error = err.Error()
} else {
results[i].Data = &api.KLineSubData{
Count: data.Count,
Items: data.Items,
}
}
}
return &api.BatchKLineData{Results: results}, nil
}
// GetTradingDates 获取交易日历
func (s *FuturesServiceImpl) GetTradingDates(ctx context.Context, req *api.TradingDatesRequest) (*api.TradingDatesData, error) {
return s.repo.GetTradingDates(ctx, req.Start, req.End)
}
// GetContractsByUnderlying 根据品种获取合约
func (s *FuturesServiceImpl) GetContractsByUnderlying(ctx context.Context, req *api.FuturesContractsRequest) (*api.FuturesContractsData, error) {
return s.repo.GetContractsByUnderlying(ctx, req.Underlying, req.Exchange)
}