You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

197 lines
6.3 KiB

"""
数据库初始化脚本
"""
import logging
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from app.config import settings
logger = logging.getLogger(__name__)
# TimescaleDB 引擎 (时序数据)
timescale_engine = create_engine(settings.TIMESCALE_DB_URL)
TimescaleSessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=timescale_engine
)
# SQLite 引擎 (配置数据)
sqlite_engine = create_engine(
f"sqlite:///{settings.SQLITE_DB_PATH}",
connect_args={"check_same_thread": False}
)
SQLiteSessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=sqlite_engine
)
async def init_timescale_db():
"""初始化 TimescaleDB 表结构"""
logger.info("初始化 TimescaleDB...")
with timescale_engine.connect() as conn:
# 创建 K 线数据表 (超表)
conn.execute(text("""
CREATE TABLE IF NOT EXISTS kline_data (
time TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
period VARCHAR(10) NOT NULL,
open NUMERIC(20, 8) NOT NULL,
high NUMERIC(20, 8) NOT NULL,
low NUMERIC(20, 8) NOT NULL,
close NUMERIC(20, 8) NOT NULL,
volume BIGINT NOT NULL,
amount NUMERIC(30, 8) DEFAULT 0,
open_interest BIGINT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
)
"""))
# 转换为 hypertable (TimescaleDB 特性)
conn.execute(text("""
SELECT create_hypertable('kline_data', 'time', if_not_exists => TRUE)
"""))
# 创建索引
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_kline_symbol_period
ON kline_data (symbol, period, time DESC)
"""))
# 创建实时行情表
conn.execute(text("""
CREATE TABLE IF NOT EXISTS realtime_quotes (
time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
symbol VARCHAR(20) NOT NULL,
last_price NUMERIC(20, 8) NOT NULL,
open_price NUMERIC(20, 8),
high_price NUMERIC(20, 8),
low_price NUMERIC(20, 8),
prev_close NUMERIC(20, 8),
volume BIGINT,
amount NUMERIC(30, 8),
bid_price_1 NUMERIC(20, 8),
bid_volume_1 BIGINT,
ask_price_1 NUMERIC(20, 8),
ask_volume_1 BIGINT,
position BIGINT DEFAULT 0
)
"""))
conn.execute(text("""
SELECT create_hypertable('realtime_quotes', 'time', if_not_exists => TRUE)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_realtime_symbol
ON realtime_quotes (symbol, time DESC)
"""))
conn.commit()
logger.info("TimescaleDB 初始化完成")
async def init_sqlite_db():
"""初始化 SQLite 表结构"""
logger.info("初始化 SQLite...")
with sqlite_engine.connect() as conn:
# 创建用户表
conn.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(100),
role VARCHAR(20) DEFAULT 'user',
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# 创建 API Key 表
conn.execute(text("""
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
key_hash VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100),
permissions TEXT,
expires_at TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
"""))
# 创建告警配置表
conn.execute(text("""
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
symbol VARCHAR(20) NOT NULL,
condition_type VARCHAR(20) NOT NULL,
condition_value NUMERIC(20, 8) NOT NULL,
alert_type VARCHAR(20) DEFAULT 'price',
status VARCHAR(20) DEFAULT 'active',
triggered_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
"""))
# 创建订阅配置表
conn.execute(text("""
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
symbol VARCHAR(20) NOT NULL,
period VARCHAR(10),
subscription_type VARCHAR(20) DEFAULT 'kline',
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
"""))
# 创建默认管理员用户
conn.execute(text("""
INSERT OR IGNORE INTO users (username, password_hash, email, role)
VALUES ('admin', 'pbkdf2:sha256:260000$defaultsalt$defaultHash', 'admin@example.com', 'admin')
"""))
conn.commit()
logger.info("SQLite 初始化完成")
async def init_databases():
"""初始化所有数据库"""
await init_timescale_db()
await init_sqlite_db()
# 依赖注入函数
def get_timescale_db():
"""获取 TimescaleDB 会话"""
db = TimescaleSessionLocal()
try:
yield db
finally:
db.close()
def get_sqlite_db():
"""获取 SQLite 会话"""
db = SQLiteSessionLocal()
try:
yield db
finally:
db.close()