You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
4.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
数据库迁移脚本 - v2.0
更新数据库 Schema 支持缓存和同步功能
"""
import logging
from sqlalchemy import text
from app.db.init_db import SQLiteSessionLocal, TimescaleSessionLocal
logger = logging.getLogger(__name__)
def init_sync_tables():
"""初始化同步相关表SQLite"""
logger.info("Initializing sync tables in SQLite...")
with SQLiteSessionLocal() as db:
# 同步配置表
db.execute(text("""
CREATE TABLE IF NOT EXISTS sync_config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
config_key VARCHAR(50) UNIQUE NOT NULL,
config_value TEXT,
description TEXT,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""))
# 同步日志表
db.execute(text("""
CREATE TABLE IF NOT EXISTS sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sync_type VARCHAR(20) NOT NULL,
symbol VARCHAR(20),
period VARCHAR(10),
start_time DATETIME,
end_time DATETIME,
records_count INTEGER,
status VARCHAR(20) NOT NULL,
error_message TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""))
# API 调用日志表
db.execute(text("""
CREATE TABLE IF NOT EXISTS api_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
endpoint VARCHAR(100) NOT NULL,
method VARCHAR(10),
request_params TEXT,
response_code INTEGER,
cache_hit BOOLEAN,
duration_ms INTEGER,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""))
# 初始化默认配置
default_configs = [
('sync_enabled', 'true', '是否启用定时同步'),
('sync_time', '17:00', '同步时间 (HH:MM)'),
('sync_weekdays', '1,2,3,4,5', '同步工作日 (1-5 表示周一至周五)'),
('stock_symbols', '600126.SH,000001.SZ,600519.SH', '股票同步列表'),
('future_symbols', 'IF2406,IC2406,IH2406,IM2406', '期货同步列表'),
('sync_periods', '5m,15m,30m,60m,1d,1w', '同步周期列表'),
('cache_days', '365', '缓存天数'),
]
for key, value, desc in default_configs:
db.execute(text("""
INSERT OR IGNORE INTO sync_config (config_key, config_value, description)
VALUES (:key, :value, :desc)
"""), {"key": key, "value": value, "desc": desc})
db.commit()
logger.info("Sync tables initialized successfully")
def init_kline_table():
"""初始化 K 线数据表TimescaleDB"""
logger.info("Initializing kline_data table in TimescaleDB...")
with TimescaleSessionLocal() as db:
# 创建 K 线数据表(如果不存在)
db.execute(text("""
CREATE TABLE IF NOT EXISTS kline_data (
time TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
period VARCHAR(10) NOT NULL,
open NUMERIC(20, 4),
high NUMERIC(20, 4),
low NUMERIC(20, 4),
close NUMERIC(20, 4),
volume NUMERIC(30, 8),
amount NUMERIC(30, 8),
open_interest NUMERIC(30, 8),
PRIMARY KEY (time, symbol, period)
)
"""))
# 创建 hypertable如果还不是
db.execute(text("""
SELECT create_hypertable('kline_data', 'time', if_not_exists => TRUE)
"""))
# 创建索引
db.execute(text("""
CREATE INDEX IF NOT EXISTS idx_kline_symbol_period
ON kline_data (symbol, period, time DESC)
"""))
db.commit()
logger.info("Kline data table initialized successfully")
def run_migrations():
"""执行所有迁移"""
logger.info("Starting database migrations for v2.0...")
try:
# 初始化 TimescaleDB 表
init_kline_table()
# 初始化 SQLite 表
init_sync_tables()
logger.info("Database migrations completed successfully")
return True
except Exception as e:
logger.error(f"Migration failed: {e}")
raise
if __name__ == "__main__":
run_migrations()