You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

989 lines
23 KiB

# 管理后台开发指南
本文档适用于 **Go****Python** 双实现。示例代码会标注对应的实现语言。
---
## 快速开始
### 1. 环境准备
#### Go 环境
```bash
# 确保Go版本 >= 1.21
go version
# 安装依赖
cd market-data-service
go mod download
```
#### Python 环境
```bash
# 确保Python版本 >= 3.10
python --version
# 创建虚拟环境
cd python_market_data_service
python -m venv venv
# 激活虚拟环境
# Linux/Mac:
source venv/bin/activate
# Windows:
venv\Scripts\activate
# 安装依赖
pip install -r requirements.txt
pip install tushare
```
### 2. 启动服务
#### Go
```bash
# 方式1直接运行
go run ./cmd/server
# 方式2使用配置文件
export CONFIG_PATH="./config.json"
go run ./cmd/server
# 方式3Makefile
make run
```
#### Python
```bash
# 方式1直接运行开发模式
python -m app.main
# 方式2使用uvicorn推荐
uvicorn app.main:app --reload --port 8080
# 方式3生产模式
uvicorn app.main:app --host 0.0.0.0 --port 8080 --workers 4
```
### 3. 访问管理后台
浏览器打开:`http://localhost:8080/admin`
API文档`http://localhost:8080/docs` (仅Python/FastAPI自动生成)
---
## 开发新功能
### 场景1添加新的配置项
假设需要添加一个新的缓存配置项:
#### Go 实现
**步骤1**: 修改配置结构 (`pkg/config/config.go`)
```go
type Config struct {
// ... 现有配置
Cache CacheConfig `json:"cache"`
}
type CacheConfig struct {
Enabled bool `json:"enabled"`
TTL int `json:"ttl"` // 缓存过期时间(秒)
MaxSize int `json:"max_size"` // 最大缓存条目数
}
```
**步骤2**: 在配置服务中添加新分组 (`internal/service/config.go`)
```go
func (s *ConfigServiceImpl) GetConfigList(...) {
// ... 现有配置分组
// 添加缓存配置分组
sections = append(sections, api.ConfigSection{
Name: "缓存配置",
Type: api.ConfigType("cache"), // 添加新的类型常量
Description: "数据缓存相关配置",
Items: []api.ConfigItem{
{
Key: "enabled",
Value: s.config.Cache.Enabled,
Type: "bool",
Description: "是否启用缓存",
Editable: true,
Required: false,
},
{
Key: "ttl",
Value: s.config.Cache.TTL,
Type: "int",
Description: "缓存过期时间(秒)",
Editable: true,
Required: false,
},
{
Key: "max_size",
Value: s.config.Cache.MaxSize,
Type: "int",
Description: "最大缓存条目数",
Editable: true,
Required: false,
},
},
})
}
```
**步骤3**: 添加配置更新处理
```go
func (s *ConfigServiceImpl) UpdateConfig(...) {
switch req.Type {
// ... 现有 case
case api.ConfigType("cache"):
if enabled, ok := req.Items["enabled"]; ok {
s.config.Cache.Enabled = enabled.(bool)
}
if ttl, ok := req.Items["ttl"]; ok {
s.config.Cache.TTL = int(ttl.(float64))
}
if maxSize, ok := req.Items["max_size"]; ok {
s.config.Cache.MaxSize = int(maxSize.(float64))
}
}
// 保存配置
if err := s.saveConfig(); err != nil {
return nil, err
}
return &api.ConfigUpdateData{
Success: true,
NeedRestart: false, // 缓存配置支持热加载
Message: "配置更新成功",
}, nil
}
```
**步骤4**: 添加配置变更回调(可选)
```go
// 在初始化时注册回调
configService.RegisterCallback(api.ConfigType("cache"), func() {
// 重新初始化缓存
cache.Init(configService.GetCurrentConfig().Cache)
})
```
#### Python 实现
**步骤1**: 修改配置结构 (`app/core/config.py`)
```python
class CacheConfig(BaseModel):
"""缓存配置"""
enabled: bool = False
ttl: int = 3600 # 缓存过期时间(秒)
max_size: int = 1000 # 最大缓存条目数
class Config(BaseModel):
"""主配置类"""
# ... 现有配置
cache: CacheConfig = Field(default_factory=CacheConfig)
```
**步骤2**: 在配置服务中添加新分组 (`app/services/config_service.py`)
```python
from app.models import ConfigType
class ConfigService:
def get_config_list(self, req: ConfigListRequest) -> ConfigListData:
sections = []
# ... 现有配置分组
# 添加缓存配置分组
if not req.type or req.type == ConfigType.CACHE:
sections.append(ConfigSection(
name="缓存配置",
type=ConfigType.CACHE,
description="数据缓存相关配置",
items=[
ConfigItem(
key="enabled",
value=self.config.cache.enabled,
type="bool",
description="是否启用缓存",
editable=True,
required=False
),
ConfigItem(
key="ttl",
value=self.config.cache.ttl,
type="int",
description="缓存过期时间(秒)",
editable=True,
required=False
),
ConfigItem(
key="max_size",
value=self.config.cache.max_size,
type="int",
description="最大缓存条目数",
editable=True,
required=False
),
]
))
return ConfigListData(sections=sections, ...)
```
**步骤3**: 添加配置更新处理
```python
def update_config(self, req: ConfigUpdateRequest) -> ConfigUpdateData:
need_restart = False
with self.lock:
if req.type == ConfigType.CACHE:
if "enabled" in req.items:
self.config.cache.enabled = bool(req.items["enabled"])
if "ttl" in req.items:
self.config.cache.ttl = int(req.items["ttl"])
if "max_size" in req.items:
self.config.cache.max_size = int(req.items["max_size"])
# 保存到文件
try:
save_config(self.config)
self._trigger_callbacks(req.type)
return ConfigUpdateData(
success=True,
need_restart=False, # 缓存配置支持热加载
message="配置更新成功"
)
except Exception as e:
return ConfigUpdateData(
success=False,
message=f"配置保存失败: {e}"
)
```
**步骤4**: 添加配置变更回调(可选)
```python
# 在初始化时注册回调
config_service.register_callback(ConfigType.CACHE, lambda:
cache.init(config_service.get_current_config().cache)
)
```
---
### 场景2添加新的数据源适配器
假设需要添加一个名为 "mydata" 的适配器:
#### Go 实现
**步骤1**: 创建适配器实现 (`adapter/mydata/adapter.go`)
```go
package mydata
import (
"market-data-service/adapter"
)
type Adapter struct {
client *Client
config map[string]string
}
func NewAdapter() *Adapter {
return &Adapter{}
}
func (a *Adapter) Connect(config map[string]string) error {
a.config = config
// 初始化客户端连接
a.client = NewClient(config["api_key"])
return nil
}
func (a *Adapter) SubscribeTicks(symbols []string, callback adapter.TickCallback) error {
// 实现实时数据订阅
return nil
}
func (a *Adapter) FetchKLines(symbol, start, end, freq string) ([]adapter.KLineData, error) {
// 实现K线数据获取
return nil, nil
}
func (a *Adapter) FetchSymbols(assetType string) ([]adapter.SymbolInfo, error) {
// 实现标的列表获取
return nil, nil
}
func (a *Adapter) FetchTradingCalendar(exchange, start, end string) ([]adapter.TradeCalData, error) {
// 实现交易日历获取
return nil, nil
}
func (a *Adapter) HealthCheck() error {
// 实现健康检查
return nil
}
func (a *Adapter) Close() error {
// 实现资源释放
return nil
}
```
**步骤2**: 注册适配器 (`internal/service/adapter.go`)
```go
import "market-data-service/adapter/mydata"
func (s *AdapterServiceImpl) registerBuiltinAdapters() {
// ... 现有适配器注册
// 注册 MyData 适配器
s.RegisterAdapter("mydata", func() adapter.DataSourceAdapter {
return mydata.NewAdapter()
})
// 设置元数据
s.metadata["mydata"] = &adapterMetadata{
Name: "mydata",
Type: "http",
Version: "1.0.0",
Description: "MyData 金融数据接口",
UpdatedAt: time.Now(),
}
// 默认配置
s.configs["mydata"] = &adapterConfig{
Enabled: false,
Config: map[string]string{
"api_key": "",
"base_url": "https://api.mydata.com",
},
}
}
```
#### Python 实现
**步骤1**: 创建适配器实现 (`app/adapters/mydata_adapter.py`)
```python
from typing import List
from app.adapters.base import (
DataSourceAdapter, TickData, KLineData,
SymbolInfo, TradeCalData, TickCallback
)
class MyDataAdapter(DataSourceAdapter):
"""MyData数据源适配器"""
def __init__(self):
self.client = None
self.config = {}
async def connect(self, config: dict) -> None:
self.config = config
# 初始化客户端连接
self.client = MyDataClient(config["api_key"])
async def subscribe_ticks(self, symbols: List[str], callback: TickCallback) -> None:
# 实现实时数据订阅
pass
async def fetch_klines(
self, symbol: str, start: str, end: str, freq: str
) -> List[KLineData]:
# 实现K线数据获取
return []
async def fetch_symbols(self, asset_type: str) -> List[SymbolInfo]:
# 实现标的列表获取
return []
async def fetch_trading_calendar(
self, exchange: str, start: str, end: str
) -> List[TradeCalData]:
# 实现交易日历获取
return []
async def health_check(self) -> bool:
# 实现健康检查
return True
async def close(self) -> None:
# 实现资源释放
pass
```
**步骤2**: 注册适配器 (`app/services/adapter_service.py`)
```python
from app.adapters.mydata_adapter import MyDataAdapter
class AdapterService:
def _register_builtin_adapters(self):
# ... 现有适配器注册
# 注册 MyData 适配器
self.register_adapter("mydata", lambda: MyDataAdapter())
# 设置元数据
self.metadata["mydata"] = {
"name": "mydata",
"type": "http",
"version": "1.0.0",
"description": "MyData 金融数据接口",
"updated_at": datetime.now()
}
# 默认配置
self.configs["mydata"] = {
"enabled": False,
"config": {
"api_key": "",
"base_url": "https://api.mydata.com"
}
}
```
**步骤3**: 前端会自动从 `/v1/admin/adapters` 接口获取适配器列表,无需额外修改。
---
### 场景3添加新的接口测试用例
#### Go 实现
**步骤1**: 在测试服务中添加用例 (`internal/service/test.go`)
```go
func (s *TestServiceImpl) GetAPITestList(...) {
categories := []api.APITestCategory{
// ... 现有分类
{
Name: "自定义接口",
Items: []api.APITestCase{
{
ID: "custom_endpoint",
Name: "自定义端点测试",
Method: "GET",
Path: "/v1/custom/{id}",
Description: "测试自定义端点",
Params: map[string]string{
"id": "123",
},
},
{
ID: "custom_post",
Name: "自定义POST接口",
Method: "POST",
Path: "/v1/custom/create",
Description: "测试创建接口",
Body: map[string]interface{}{
"name": "test",
"value": 100,
},
},
},
},
}
return &api.APITestListData{
Categories: categories,
BaseURL: "",
}, nil
}
```
#### Python 实现
**步骤1**: 在测试服务中添加用例 (`app/services/test_service.py`)
```python
def get_api_test_list(self) -> APITestListData:
categories = [
# ... 现有分类
APITestCategory(
name="自定义接口",
items=[
APITestCase(
id="custom_endpoint",
name="自定义端点测试",
method="GET",
path="/v1/custom/{id}",
description="测试自定义端点",
params={"id": "123"}
),
APITestCase(
id="custom_post",
name="自定义POST接口",
method="POST",
path="/v1/custom/create",
description="测试创建接口",
body={"name": "test", "value": 100}
),
]
),
]
return APITestListData(categories=categories, base_url="")
```
**步骤2**: 前端会自动显示新的测试用例,无需修改前端代码。
---
## 调试技巧
### 1. 查看配置变更日志
#### Go
```go
// 在 ConfigService 中添加日志
func (s *ConfigServiceImpl) UpdateConfig(...) {
log.Printf("[Config] Updating config type: %s", req.Type)
log.Printf("[Config] Update items: %+v", req.Items)
// ... 更新逻辑
log.Printf("[Config] Config updated successfully, need restart: %v", needRestart)
}
```
#### Python
```python
# 在 ConfigService 中添加日志
from app.core.logger import info
def update_config(self, req: ConfigUpdateRequest) -> ConfigUpdateData:
info(f"[Config] Updating config type: {req.type}")
info(f"[Config] Update items: {req.items}")
# ... 更新逻辑
info(f"[Config] Config updated successfully, need restart: {need_restart}")
```
### 2. 适配器调试
#### Go
```go
// 在适配器中添加详细日志
func (a *MyAdapter) FetchKLines(...) {
log.Printf("[MyAdapter] FetchKLines called: symbol=%s, start=%s, end=%s, freq=%s",
symbol, start, end, freq)
// ... 实现逻辑
log.Printf("[MyAdapter] FetchKLines completed: got %d records", len(result))
return result, nil
}
```
#### Python
```python
# 在适配器中添加详细日志
from app.core.logger import info
async def fetch_klines(self, symbol: str, start: str, end: str, freq: str) -> List[KLineData]:
info(f"[MyAdapter] FetchKLines called: symbol={symbol}, start={start}, end={end}, freq={freq}")
# ... 实现逻辑
info(f"[MyAdapter] FetchKLines completed: got {len(result)} records")
return result
```
### 3. 使用调试器
#### Go (Delve)
```bash
# 启动调试模式
dlv debug ./cmd/server
# 在关键位置设置断点
(dlv) break internal/service/config.go:100
(dlv) break internal/service/adapter.go:150
# 运行
(dlv) continue
```
#### Python (pdb 或 IDE)
```python
# 代码中插入断点
import pdb; pdb.set_trace()
# 或使用IPython
from IPython import embed; embed()
```
或使用 VS Code / PyCharm 的图形化调试。
---
## 测试
### 单元测试示例
#### Go
```go
// internal/service/config_test.go
package service
import (
"context"
"testing"
"market-data-service/api"
)
func TestConfigService_UpdateConfig(t *testing.T) {
// 创建临时配置文件
tmpFile := t.TempDir() + "/config.json"
service, err := NewConfigService(tmpFile)
if err != nil {
t.Fatal(err)
}
// 测试更新配置
req := &api.ConfigUpdateRequest{
Type: api.ConfigTypeServer,
Items: map[string]interface{}{
"port": 9090,
},
}
result, err := service.UpdateConfig(context.Background(), req)
if err != nil {
t.Fatal(err)
}
if !result.Success {
t.Errorf("expected success, got: %v", result.Message)
}
// 验证配置已更新
cfg := service.GetCurrentConfig()
if cfg.Server.Port != 9090 {
t.Errorf("expected port 9090, got: %d", cfg.Server.Port)
}
}
```
#### Python
```python
# tests/test_config_service.py
import pytest
from app.services.config_service import ConfigService
from app.models import ConfigUpdateRequest, ConfigType
def test_config_service_update():
# 创建服务
service = ConfigService()
# 测试更新配置
req = ConfigUpdateRequest(
type=ConfigType.SERVER,
items={"port": 9090}
)
result = service.update_config(req)
assert result.success is True
assert service.get_current_config().server.port == 9090
```
运行测试:
```bash
# Go
go test ./internal/service/...
# Python
pytest tests/
```
### API 测试
```bash
# 使用 httpie 测试
http GET localhost:8080/v1/admin/system/status
http POST localhost:8080/v1/admin/config \
type=server \
items:='{"port": 8080}'
# 使用 curl 测试
curl -X POST "http://localhost:8080/v1/admin/tests/api/run" \
-H "Content-Type: application/json" \
-d '{"id": "stock_klines", "params": {"symbol": "000001.SZ"}}'
```
---
## 常见问题
### Q1: 配置热加载不生效?
**检查点**:
1. 配置文件路径是否正确
2. 配置类型是否支持热加载(数据库配置需重启)
3. 回调函数是否正确注册
**Go**:
```go
// 检查配置是否正确保存
config, _ := service.GetConfigList(ctx, &api.ConfigListRequest{})
fmt.Printf("Current config: %+v\n", config)
```
**Python**:
```python
# 检查配置是否正确保存
config = service.get_config_list(ConfigListRequest())
print(f"Current config: {config}")
```
### Q2: 适配器无法启用?
**检查点**:
1. 适配器是否已注册
2. 配置项是否完整(如 token
3. Connect 方法是否返回错误
**Go**:
```go
// 添加调试日志
func (s *AdapterServiceImpl) ToggleAdapter(...) {
log.Printf("[Adapter] Toggling adapter: %s, enable: %v", req.Name, req.Enable)
cfg, ok := s.configs[req.Name]
if !ok {
log.Printf("[Adapter] Adapter not found: %s", req.Name)
return fmt.Errorf("adapter not found: %s", req.Name)
}
log.Printf("[Adapter] Current config: %+v", cfg)
// ...
}
```
**Python**:
```python
# 添加调试日志
def toggle_adapter(self, req: AdapterToggleRequest) -> None:
info(f"[Adapter] Toggling adapter: {req.name}, enable: {req.enable}")
if req.name not in self.configs:
info(f"[Adapter] Adapter not found: {req.name}")
raise ValueError(f"Adapter not found: {req.name}")
info(f"[Adapter] Current config: {self.configs[req.name]}")
# ...
```
### Q3: 前端页面空白?
**检查点**:
1. 浏览器控制台是否有错误
2. API 接口是否返回正确数据
3. 静态资源是否正确加载
```bash
# 检查管理后台路由是否正常
curl http://localhost:8080/admin
# 检查API接口
curl http://localhost:8080/v1/admin/system/status
```
### Q4: Python 依赖安装失败?
**解决方案**:
```bash
# 1. 升级pip
pip install --upgrade pip
# 2. 安装系统依赖Ubuntu/Debian
sudo apt install -y python3-dev libpq-dev gcc
# 3. 使用国内镜像
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
# 4. 单独安装有问题的包
pip install sqlalchemy psycopg2-binary --force-reinstall
```
---
## 性能优化建议
### 1. 配置缓存
#### Go
```go
// 添加配置缓存层
type CachedConfigService struct {
inner ConfigService
cache *config.Config
mu sync.RWMutex
ttl time.Duration
lastUpdate time.Time
}
func (s *CachedConfigService) GetCurrentConfig() *config.Config {
s.mu.RLock()
if time.Since(s.lastUpdate) < s.ttl {
defer s.mu.RUnlock()
return s.cache
}
s.mu.RUnlock()
// 缓存过期,重新加载
s.mu.Lock()
defer s.mu.Unlock()
s.cache = s.inner.GetCurrentConfig()
s.lastUpdate = time.Now()
return s.cache
}
```
#### Python
```python
# 添加配置缓存层
from functools import lru_cache
import time
class CachedConfigService:
def __init__(self, inner: ConfigService, ttl: int = 60):
self.inner = inner
self.cache = None
self.ttl = ttl
self.last_update = 0
self.lock = threading.RLock()
def get_current_config(self):
with self.lock:
if time.time() - self.last_update < self.ttl:
return self.cache
self.cache = self.inner.get_current_config()
self.last_update = time.time()
return self.cache
```
### 2. 数据库连接池
#### Go
```go
// 在 repository/database.go 中配置
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(time.Hour)
```
#### Python
```python
# 在 app/repositories/database.py 中配置
engine = create_engine(
config.database.database_url,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
)
```
---
## 最佳实践
### 1. 配置管理
- 敏感信息密码、token使用 `********` 掩码显示
- 重要配置变更记录操作日志
- 配置验证在更新前进行
### 2. 适配器开发
- 实现完整的 `HealthCheck` 方法
- 处理网络超时和重试
- 使用连接池管理资源
### 3. 接口测试
- 使用参数化测试数据
- 测试失败时记录详细错误信息
- 清理测试产生的临时数据
### 4. 代码组织
**Go**:
- 按功能分层api/internal/pkg/cmd
- 使用接口定义依赖
- 错误处理要明确
**Python**:
- 使用类型注解
- 遵循PEP 8规范
- 合理使用异步
---
## 参考资源
### Go
- [Gin 框架文档](https://gin-gonic.com/docs/)
- [Go 并发模式](https://go.dev/blog/pipelines)
- [PostgreSQL 文档](https://www.postgresql.org/docs/)
### Python
- [FastAPI 文档](https://fastapi.tiangolo.com/)
- [SQLAlchemy 文档](https://docs.sqlalchemy.org/)
- [Pydantic 文档](https://docs.pydantic.dev/)
---
**文档结束**