You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
4.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""管理服务 - 对应Go的internal/service/admin.go"""
from datetime import datetime
from typing import Optional
import uuid
from sqlalchemy.orm import Session
from sqlalchemy import text
from app.models import (
DataSourceStatusData, DataSourceInfo, SourceSwitchRequest,
BackfillRequest, HealthResponse, DataSourceStatus
)
from app.core.config import get_config, save_config
from app.core.logger import info
class AdminService:
"""管理服务"""
def __init__(self, db: Session):
self.db = db
def get_data_source_status(self) -> DataSourceStatusData:
"""获取数据源状态"""
config = get_config()
# 始终以配置文件为基础(确保内存中的最新配置)
data = DataSourceStatusData()
data.stock.active_source = config.sources.stock.active
data.futures.active_source = config.sources.futures.active
try:
# 查询数据库中的数据源配置,用数据库值覆盖(如果存在)
result = self.db.execute(text("""
SELECT asset_class, active_source, standby_sources, updated_at
FROM data_source_config
"""))
for row in result:
asset_class, active_source, standby_sources, updated_at = row
if asset_class == "stock":
data.stock.active_source = active_source
elif asset_class == "futures":
data.futures.active_source = active_source
return data
except Exception as e:
info(f"Data source config not found, using config file only: {e}")
return data
def switch_data_source(self, req: SourceSwitchRequest) -> None:
"""切换数据源"""
from app.core.config import get_config, save_config
config = get_config()
# 更新内存中的配置
# AssetClass 继承自 str可以直接比较
if req.asset_class == "all":
config.sources.stock.active = req.source
config.sources.futures.active = req.source
elif req.asset_class == "stock":
config.sources.stock.active = req.source
elif req.asset_class == "futures":
config.sources.futures.active = req.source
# 保存到配置文件
try:
save_config(config)
except Exception as e:
info(f"Failed to save config: {e}")
# 同时尝试保存到数据库(如果支持)
try:
asset_classes = []
if req.asset_class == "all":
asset_classes = ["stock", "futures"]
else:
asset_classes = [str(req.asset_class)]
for ac in asset_classes:
# MySQL: INSERT ... ON DUPLICATE KEY UPDATE
self.db.execute(
text("""
INSERT INTO data_source_config
(asset_class, active_source, updated_at)
VALUES (:asset_class, :source, NOW())
ON DUPLICATE KEY UPDATE
active_source = VALUES(active_source),
updated_at = VALUES(updated_at)
"""),
{"asset_class": ac, "source": req.source}
)
self.db.commit()
except Exception as e:
info(f"Database update failed (using config file only): {e}")
self.db.rollback()
# 如果需要同步补录,启动后台任务
if req.sync_backfill:
info(f"Starting backfill for {req.asset_class} from {req.start_date}")
# TODO: 启动异步补录任务
def backfill_data(self, req: BackfillRequest) -> str:
"""历史数据补录"""
task_id = str(uuid.uuid4())
info(f"Starting backfill task {task_id} for {req.asset_class}")
# TODO: 将补录任务存入数据库启动后台Worker执行
return task_id
def health_check(self) -> HealthResponse:
"""健康检查"""
try:
# 检查数据库连接
self.db.execute(text("SELECT 1"))
return HealthResponse(
status="healthy",
timestamp=datetime.now()
)
except Exception as e:
return HealthResponse(
status=f"unhealthy: {str(e)}",
timestamp=datetime.now()
)