You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

294 lines
7.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package tushare
import (
"context"
"fmt"
"strings"
"time"
"market-data-service/adapter"
)
// Adapter Tushare数据源适配器
type Adapter struct {
client *Client
config map[string]string
}
// NewAdapter 创建Tushare适配器
func NewAdapter() *Adapter {
return &Adapter{}
}
// Connect 建立连接
func (a *Adapter) Connect(config map[string]string) error {
token, ok := config["token"]
if !ok || token == "" {
return fmt.Errorf("tushare token is required")
}
a.client = NewClient(token)
if baseURL, ok := config["base_url"]; ok && baseURL != "" {
a.client.SetBaseURL(baseURL)
}
a.config = config
return nil
}
// SubscribeTicks 订阅实时TickTushare不支持实时推送返回错误
func (a *Adapter) SubscribeTicks(symbols []string, callback adapter.TickCallback) error {
return fmt.Errorf("tushare does not support real-time tick subscription")
}
// FetchKLines 拉取历史K线
func (a *Adapter) FetchKLines(symbol, start, end, freq string) ([]adapter.KLineData, error) {
// 判断是股票还是期货
if strings.Contains(symbol, ".SH") || strings.Contains(symbol, ".SZ") || strings.Contains(symbol, ".BJ") {
return a.fetchStockKLines(symbol, start, end, freq)
}
return a.fetchFuturesKLines(symbol, start, end, freq)
}
// fetchStockKLines 获取股票K线
func (a *Adapter) fetchStockKLines(symbol, start, end, freq string) ([]adapter.KLineData, error) {
// 转换symbol格式: 000001.SZ -> 000001.SZ
tsCode := symbol
switch freq {
case "1d", "", "day":
return a.fetchStockDaily(tsCode, start, end)
case "1m", "5m", "15m", "30m", "60m":
return a.fetchStockMinute(tsCode, start, end, freq)
default:
return nil, fmt.Errorf("unsupported frequency: %s", freq)
}
}
// fetchStockDaily 获取股票日线
func (a *Adapter) fetchStockDaily(tsCode, start, end string) ([]adapter.KLineData, error) {
data, err := a.client.GetStockDaily(tsCode, start, end)
if err != nil {
return nil, err
}
result := make([]adapter.KLineData, len(data))
for i, d := range data {
tradeDate, _ := time.Parse("20060102", d.TradeDate)
result[i] = adapter.KLineData{
Symbol: d.TSCode,
Time: tradeDate.Unix(),
Open: d.Open,
High: d.High,
Low: d.Low,
Close: d.Close,
Volume: int64(d.Volume * 100), // 手 -> 股
Amount: d.Amount * 1000, // 千元 -> 元
}
}
return result, nil
}
// fetchStockMinute 获取股票分钟线
func (a *Adapter) fetchStockMinute(tsCode, start, end, freq string) ([]adapter.KLineData, error) {
// 去掉'm'后缀
freqNum := strings.TrimSuffix(freq, "m")
data, err := a.client.GetStockMinute(tsCode, start, end, freqNum)
if err != nil {
return nil, err
}
result := make([]adapter.KLineData, len(data))
for i, d := range data {
// 解析时间格式: 2025-03-07 09:31:00
tradeTime, _ := time.Parse("2006-01-02 15:04:05", d.TradeTime)
result[i] = adapter.KLineData{
Symbol: d.TSCode,
Time: tradeTime.Unix(),
Open: d.Open,
High: d.High,
Low: d.Low,
Close: d.Close,
Volume: int64(d.Volume * 100),
Amount: d.Amount * 1000,
}
}
return result, nil
}
// fetchFuturesKLines 获取期货K线
func (a *Adapter) fetchFuturesKLines(symbol, start, end, freq string) ([]adapter.KLineData, error) {
switch freq {
case "1d", "", "day":
return a.fetchFuturesDaily(symbol, start, end)
case "1m", "5m", "15m", "30m", "60m":
return a.fetchFuturesMinute(symbol, start, end, freq)
default:
return nil, fmt.Errorf("unsupported frequency: %s", freq)
}
}
// fetchFuturesDaily 获取期货日线
func (a *Adapter) fetchFuturesDaily(tsCode, start, end string) ([]adapter.KLineData, error) {
data, err := a.client.GetFuturesDaily(tsCode, start, end)
if err != nil {
return nil, err
}
result := make([]adapter.KLineData, len(data))
for i, d := range data {
tradeDate, _ := time.Parse("20060102", d.TradeDate)
result[i] = adapter.KLineData{
Symbol: d.TSCode,
Time: tradeDate.Unix(),
Open: d.Open,
High: d.High,
Low: d.Low,
Close: d.Close,
Volume: int64(d.Volume),
Amount: d.Amount * 10000, // 万元 -> 元
OpenInterest: int64(d.OpenInterest),
}
}
return result, nil
}
// fetchFuturesMinute 获取期货分钟线
func (a *Adapter) fetchFuturesMinute(tsCode, start, end, freq string) ([]adapter.KLineData, error) {
freqNum := strings.TrimSuffix(freq, "m")
data, err := a.client.GetFuturesMinute(tsCode, start, end, freqNum)
if err != nil {
return nil, err
}
result := make([]adapter.KLineData, len(data))
for i, d := range data {
tradeTime, _ := time.Parse("2006-01-02 15:04:05", d.TradeTime)
result[i] = adapter.KLineData{
Symbol: d.TSCode,
Time: tradeTime.Unix(),
Open: d.Open,
High: d.High,
Low: d.Low,
Close: d.Close,
Volume: int64(d.Volume),
Amount: d.Amount * 10000,
OpenInterest: int64(d.OpenInterest),
}
}
return result, nil
}
// FetchSymbols 获取标的列表
func (a *Adapter) FetchSymbols(assetType string) ([]adapter.SymbolInfo, error) {
switch assetType {
case "stock":
return a.fetchStockSymbols()
case "futures":
return a.fetchFuturesSymbols()
default:
return nil, fmt.Errorf("unsupported asset type: %s", assetType)
}
}
// fetchStockSymbols 获取股票列表
func (a *Adapter) fetchStockSymbols() ([]adapter.SymbolInfo, error) {
data, err := a.client.GetStockBasic()
if err != nil {
return nil, err
}
result := make([]adapter.SymbolInfo, 0, len(data))
for _, d := range data {
// 只返回上市状态的
if d.ListStatus != "L" {
continue
}
result = append(result, adapter.SymbolInfo{
SymbolID: d.TSCode,
Name: d.Name,
Exchange: d.Exchange,
})
}
return result, nil
}
// fetchFuturesSymbols 获取期货列表
func (a *Adapter) fetchFuturesSymbols() ([]adapter.SymbolInfo, error) {
data, err := a.client.GetFuturesBasic("")
if err != nil {
return nil, err
}
result := make([]adapter.SymbolInfo, len(data))
for i, d := range data {
result[i] = adapter.SymbolInfo{
SymbolID: d.TSCode,
Name: d.Name,
Exchange: d.Exchange,
}
}
return result, nil
}
// FetchTradingCalendar 获取交易日历
func (a *Adapter) FetchTradingCalendar(exchange, start, end string) ([]adapter.TradeCalData, error) {
// Tushare交易所代码映射
exchangeMap := map[string]string{
"SH": "SSE",
"SZ": "SZSE",
"SHFE": "SHFE",
"DCE": "DCE",
"CZCE": "CZCE",
"CFFEX": "CFFEX",
"INE": "INE",
}
tsExchange, ok := exchangeMap[exchange]
if !ok {
tsExchange = "SSE"
}
data, err := a.client.GetTradeCal(tsExchange, start, end, -1)
if err != nil {
return nil, err
}
result := make([]adapter.TradeCalData, len(data))
for i, d := range data {
calDate, _ := time.Parse("20060102", d.CalDate)
result[i] = adapter.TradeCalData{
Date: calDate,
IsTradingDay: d.IsOpen == 1,
}
}
return result, nil
}
// HealthCheck 健康检查
func (a *Adapter) HealthCheck() error {
if a.client == nil {
return fmt.Errorf("client not initialized")
}
// 尝试获取交易日历作为健康检查
now := time.Now()
start := now.AddDate(0, 0, -7).Format("20060102")
end := now.Format("20060102")
_, err := a.client.GetTradeCal("SSE", start, end, 1)
return err
}
// Close 关闭连接
func (a *Adapter) Close() error {
// Tushare是HTTP接口无需关闭
return nil
}
// GetClient 获取底层客户端用于直接调用Tushare API
func (a *Adapter) GetClient() *Client {
return a.client
}