You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

284 lines
8.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
同步管理 API v2
金融数据中台 - 数据同步控制接口
"""
from datetime import datetime
from typing import Annotated, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.db.init_db import get_sqlite_db
from app.schemas import ResponseData
router = APIRouter()
@router.get("/config", response_model=ResponseData)
async def get_sync_config(db: Session = Depends(get_sqlite_db)):
"""获取同步配置"""
from sqlalchemy import text
try:
result = db.execute(text("""
SELECT config_key, config_value, description, updated_at
FROM sync_config
ORDER BY config_key
"""))
configs = {}
for row in result:
configs[row[0]] = {
"value": row[1],
"description": row[2],
"updated_at": row[3]
}
return ResponseData(
code=0,
message="success",
data={"configs": configs, "count": len(configs)}
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to get sync config: {str(e)}"
)
@router.put("/config", response_model=ResponseData)
async def update_sync_config(
config: Annotated[dict, Query(description="配置项JSON 格式")],
db: Session = Depends(get_sqlite_db)
):
"""
更新同步配置
示例:
```json
{
"sync_time": "17:00",
"sync_symbols": "IF2406,IC2406,IH2406,IM2406",
"sync_periods": "1m,5m,15m,30m,1h,1d",
"cache_ttl": "300"
}
```
"""
from sqlalchemy import text
from datetime import datetime
try:
for key, value in config.items():
db.execute(text("""
INSERT INTO sync_config (config_key, config_value, description, updated_at)
VALUES (:key, :value, :desc, :updated_at)
ON CONFLICT(config_key) DO UPDATE SET
config_value = :value,
updated_at = :updated_at
"""), {
"key": key,
"value": str(value),
"desc": f"配置项 {key}",
"updated_at": datetime.utcnow()
})
db.commit()
return ResponseData(
code=0,
message="Config updated successfully",
data={"updated": list(config.keys())}
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=500,
detail=f"Failed to update sync config: {str(e)}"
)
@router.post("/trigger", response_model=ResponseData)
async def trigger_sync(
sync_type: Annotated[str, Query(description="同步类型kline/realtime/all")] = "kline",
symbols: Annotated[str, Query(description="品种列表,逗号分隔,不传则使用默认")] = "",
periods: Annotated[str, Query(description="周期列表,逗号分隔,不传则使用默认")] = "",
):
"""
手动触发同步任务
- **sync_type**: 同步类型 (kline=K 线数据realtime=实时行情all=全部)
- **symbols**: 品种列表 (逗号分隔,如 IF2406,IC2406)
- **periods**: 周期列表 (逗号分隔,如 1m,5m,15m)
"""
from app.services.data_sync_service import DataSyncService
try:
# 解析品种列表
symbol_list = [s.strip() for s in symbols.split(",") if s.strip()] if symbols else None
# 解析周期列表
period_list = [p.strip() for p in periods.split(",") if p.strip()] if periods else None
if sync_type == "kline":
# 同步 K 线数据
if symbol_list:
results = []
for symbol in symbol_list:
for period in (period_list or DataSyncService.DEFAULT_PERIODS):
count = DataSyncService.sync_kline_data(symbol, period)
results.append({"symbol": symbol, "period": period, "count": count})
return ResponseData(
code=0,
message="Kline sync completed",
data={"results": results, "total": sum(r["count"] for r in results)}
)
else:
# 同步所有默认品种
result = await DataSyncService.sync_all_symbols()
return ResponseData(
code=0,
message="All symbols synced",
data=result
)
elif sync_type == "realtime":
# 同步实时行情
symbol_list = symbol_list or DataSyncService.DEFAULT_SYMBOLS
count = DataSyncService.sync_realtime_quotes(symbol_list)
return ResponseData(
code=0,
message="Realtime quotes synced",
data={"count": count, "symbols": symbol_list}
)
elif sync_type == "all":
# 同步全部
kline_result = await DataSyncService.sync_all_symbols()
realtime_count = DataSyncService.sync_realtime_quotes(
symbol_list or DataSyncService.DEFAULT_SYMBOLS
)
return ResponseData(
code=0,
message="All sync completed",
data={
"kline": kline_result,
"realtime": {"count": realtime_count}
}
)
else:
raise HTTPException(
status_code=400,
detail=f"Invalid sync_type: {sync_type}. Must be 'kline', 'realtime', or 'all'"
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to trigger sync: {str(e)}"
)
@router.get("/logs", response_model=ResponseData)
async def get_sync_logs(
sync_type: Annotated[str, Query(description="同步类型kline/realtime/all")] = "",
symbol: Annotated[str, Query(description="品种代码")] = "",
status: Annotated[str, Query(description="状态success/failed/all")] = "all",
limit: Annotated[int, Query(description="返回数量,默认 50最大 200")] = 50,
db: Session = Depends(get_sqlite_db)
):
"""查询同步日志"""
from sqlalchemy import text
try:
# 限制 limit 最大值
if limit > 200:
limit = 200
if limit < 1:
limit = 1
# 构建查询
query = "SELECT * FROM sync_log WHERE 1=1"
params = {}
if sync_type:
query += " AND sync_type = :sync_type"
params["sync_type"] = sync_type
if symbol:
query += " AND symbol = :symbol"
params["symbol"] = symbol
if status != "all":
query += " AND status = :status"
params["status"] = status
query += " ORDER BY start_time DESC LIMIT :limit"
params["limit"] = limit
result = db.execute(text(query), params)
logs = []
for row in result:
logs.append({
"id": row[0],
"sync_type": row[1],
"symbol": row[2],
"period": row[3],
"start_time": row[4],
"end_time": row[5],
"status": row[6],
"records_synced": row[7],
"error_message": row[8],
"created_at": row[9]
})
return ResponseData(
code=0,
message="success",
data={"logs": logs, "count": len(logs)}
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to get sync logs: {str(e)}"
)
@router.get("/status", response_model=ResponseData)
async def get_sync_status():
"""获取同步状态"""
from app.tasks.sync_tasks import get_scheduler
try:
scheduler = get_scheduler()
# 获取所有定时任务
jobs = scheduler.get_jobs()
job_status = []
for job in jobs:
job_status.append({
"id": job.id,
"name": job.name,
"next_run": str(job.next_run_time) if job.next_run_time else None,
"trigger": str(job.trigger)
})
return ResponseData(
code=0,
message="success",
data={
"scheduler_running": scheduler.running,
"jobs": job_status,
"job_count": len(jobs)
}
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to get sync status: {str(e)}"
)