# 金融数据中台 - Phase 1 开发任务 **项目**: 20260330_kline_system **版本**: v2.0 **日期**: 2026-04-03 **优先级**: 🔴 高 --- ## 📋 Phase 1 开发任务清单 ### 1. 数据库 Schema 更新 #### 1.1 TimescaleDB - K 线数据表 **文件**: `backend/db/migrations/002_create_kline_data.sql` ```sql -- K 线数据表 CREATE TABLE IF NOT EXISTS kline_data ( time TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, security_type VARCHAR(20) NOT NULL, period VARCHAR(10) NOT NULL, open DECIMAL(20, 4) NOT NULL, high DECIMAL(20, 4) NOT NULL, low DECIMAL(20, 4) NOT NULL, close DECIMAL(20, 4) NOT NULL, volume BIGINT, turnover DECIMAL(20, 4), open_interest BIGINT, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); -- 创建 hypertable SELECT create_hypertable('kline_data', 'time', if_not_exists => TRUE); -- 创建索引 CREATE INDEX IF NOT EXISTS idx_kline_symbol_period ON kline_data (symbol, period, time DESC); ``` #### 1.2 SQLite - 配置表 **文件**: `backend/db/migrations/003_create_config_tables.sql` ```sql -- 同步配置表 CREATE TABLE IF NOT EXISTS sync_config ( id INTEGER PRIMARY KEY AUTOINCREMENT, config_key VARCHAR(50) UNIQUE NOT NULL, config_value TEXT NOT NULL, description TEXT, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- 同步日志表 CREATE TABLE IF NOT EXISTS sync_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, sync_type VARCHAR(20) NOT NULL, symbol VARCHAR(20), period VARCHAR(10), start_time DATETIME, end_time DATETIME, records_count INTEGER, status VARCHAR(20) NOT NULL, error_message TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- API 调用日志表 CREATE TABLE IF NOT EXISTS api_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, endpoint VARCHAR(50) NOT NULL, symbol VARCHAR(20), period VARCHAR(10), cache_hit BOOLEAN, response_time_ms INTEGER, client_ip VARCHAR(50), created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); ``` --- ### 2. 实现 CacheService **文件**: `backend/app/services/cache_service.py` ```python """ K 线数据缓存服务 """ from datetime import datetime, timedelta from typing import List, Optional from sqlalchemy.orm import Session from app.models.kline import KlineData from app.db.init_db import get_timescale_db class CacheService: """K 线数据缓存服务""" def __init__(self, db_session: Session): self.db = db_session def get_kline( self, symbol: str, period: str, start_time: datetime, end_time: datetime, security_type: str = "STOCK_A" ) -> Optional[List[KlineData]]: """从缓存获取 K 线数据""" pass def set_kline( self, symbol: str, period: str, data: List[KlineData], security_type: str = "STOCK_A" ) -> bool: """写入 K 线数据到缓存""" pass def clear_cache( self, symbol: str, period: str, security_type: str = "STOCK_A" ) -> bool: """清除指定缓存""" pass ``` --- ### 3. 实现 SyncService **文件**: `backend/app/services/sync_service.py` ```python """ 定时同步服务 """ from datetime import datetime from typing import List, Dict from app.services.cache_service import CacheService from app.services.amazing_data_service import amazing_data_service from app.db.init_db import get_sqlite_db, get_timescale_db class SyncService: """定时同步服务""" def __init__(self): self.sqlite_db = get_sqlite_db() self.timescale_db = get_timescale_db() self.cache_service = CacheService(self.timescale_db) def sync_symbol( self, symbol: str, security_type: str, periods: List[str], start_date: str, end_date: str ) -> Dict: """同步单个品种数据""" pass def sync_all(self) -> List[Dict]: """同步所有配置品种""" pass def get_sync_config(self) -> Dict: """获取同步配置""" pass def update_sync_config(self, config: Dict) -> bool: """更新同步配置""" pass ``` --- ### 4. 更新 KlineService **文件**: `backend/app/services/kline_service_v2.py` ```python """ K 线数据服务 v2 (缓存优先策略) """ from datetime import datetime from typing import List, Optional from app.services.cache_service import CacheService from app.services.amazing_data_service import amazing_data_service class KlineServiceV2: """K 线数据服务 v2""" def __init__(self): self.timescale_db = get_timescale_db() self.cache_service = CacheService(self.timescale_db) def get_kline_data( self, symbol: str, period: str, start_time: datetime, end_time: datetime, security_type: str = "STOCK_A" ) -> List[Dict]: """ 获取 K 线数据(缓存优先) 流程: 1. 查询缓存 2. 缓存命中则返回 3. 缓存未命中则调用 amazingData 获取并缓存 4. 返回数据 """ pass ``` --- ### 5. 实现定时任务调度 **文件**: `backend/app/tasks/sync_scheduler.py` ```python """ 定时同步任务调度器 """ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from app.services.sync_service import SyncService scheduler = BackgroundScheduler() def init_scheduler(): """初始化调度器""" sync_service = SyncService() # 从配置读取同步时间 config = sync_service.get_sync_config() hour, minute = config['sync_time'].split(':') # 添加定时任务 scheduler.add_job( func=sync_service.sync_all, trigger=CronTrigger(hour=hour, minute=minute, day_of_week='1-5'), id='daily_sync', name='每日数据同步', replace_existing=True ) scheduler.start() def shutdown_scheduler(): """关闭调度器""" scheduler.shutdown() ``` --- ### 6. 更新 API 接口 **文件**: `backend/app/api/v2/kline.py` ```python """ K 线数据 API v2 """ from fastapi import APIRouter, Query from datetime import datetime from app.services.kline_service_v2 import KlineServiceV2 router = APIRouter(prefix="/api/v2", tags=["K 线数据 V2"]) @router.get("/kline") async def get_kline( symbol: str = Query(..., description="证券代码"), period: str = Query(..., description="周期"), start: datetime = Query(..., description="开始时间"), end: datetime = Query(..., description="结束时间") ): """获取 K 线数据(缓存优先)""" pass @router.get("/kline/cache-stats") async def get_cache_stats(): """获取缓存统计信息""" pass ``` **文件**: `backend/app/api/v2/sync.py` ```python """ 同步管理 API v2 """ from fastapi import APIRouter from app.services.sync_service import SyncService router = APIRouter(prefix="/api/v2", tags=["同步管理 V2"]) @router.post("/sync/trigger") async def trigger_sync(): """手动触发同步""" pass @router.get("/sync/config") async def get_sync_config(): """获取同步配置""" pass @router.put("/sync/config") async def update_sync_config(config: dict): """更新同步配置""" pass @router.get("/sync/logs") async def get_sync_logs(limit: int = 50): """获取同步日志""" pass ``` --- ### 7. 更新主应用 **文件**: `backend/app/main.py` ```python # 添加 v2 路由 from app.api.v2.kline import router as kline_v2_router from app.api.v2.sync import router as sync_v2_router from app.tasks.sync_scheduler import init_scheduler, shutdown_scheduler @asynccontextmanager async def lifespan(app: FastAPI): # 启动时初始化 init_databases() init_scheduler() # 初始化定时任务 yield # 关闭时清理 shutdown_scheduler() app = FastAPI(lifespan=lifespan) # 注册 v2 路由 app.include_router(kline_v2_router) app.include_router(sync_v2_router) ``` --- ### 8. 更新依赖 **文件**: `backend/requirements.txt` ``` # 添加 APScheduler APScheduler==3.10.4 ``` --- ## 📊 开发优先级 | 任务 | 优先级 | 预计时间 | 依赖 | |------|--------|----------|------| | 数据库 Schema | 🔴 P0 | 1 小时 | - | | CacheService | 🔴 P0 | 2 小时 | 数据库 | | SyncService | 🔴 P0 | 3 小时 | CacheService | | 定时任务调度 | 🟠 P1 | 2 小时 | SyncService | | KlineServiceV2 | 🔴 P0 | 2 小时 | CacheService | | API v2 接口 | 🟠 P1 | 3 小时 | Services | | 主应用更新 | 🟠 P1 | 1 小时 | API | **总预计时间**: 14 小时 --- ## ✅ 验收标准 1. **数据库**: - [ ] kline_data 表创建成功 - [ ] sync_config 表创建成功 - [ ] sync_log 表创建成功 - [ ] api_log 表创建成功 2. **缓存服务**: - [ ] 可以查询缓存数据 - [ ] 可以写入缓存数据 - [ ] 可以清除缓存 3. **同步服务**: - [ ] 可以手动同步单个品种 - [ ] 可以同步所有品种 - [ ] 可以获取/更新配置 4. **定时任务**: - [ ] 定时任务正常启动 - [ ] 定时任务按时执行 - [ ] 同步日志正常记录 5. **API 接口**: - [ ] /api/v2/kline 正常返回数据 - [ ] 缓存命中时响应快 - [ ] 缓存未命中时回源获取 - [ ] /api/v2/sync/* 接口正常 --- **创建时间**: 2026-04-03 **状态**: 待开发