You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
8.1 KiB

"""
金融数据中台 - 主应用入口
v2.1.0 - 实时推送 + 智能告警 + 数据订阅 + 质量监控
"""
import logging
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.db.init_db import init_databases
from app.middleware.auth import AuthMiddleware
from app.middleware.rate_limit import RateLimitMiddleware
from app.api.v1.auth import router as auth_router
from app.api.v1.kline import router as kline_router
from app.api.v1.realtime import router as realtime_router
from app.api.v1.alert import router as alert_router
from app.api.v1.subscription import router as subscription_router
from app.api.v1.user import router as user_router
from app.api.v1.amazing_data import router as amazing_data_router
from app.api.v2.kline import router as kline_v2_router
from app.api.v2.sync import router as sync_v2_router
from app.api.v2.alert import router as alert_v2_router
from app.api.v2.quality import router as quality_v2_router
from app.api.v2.websocket import router as websocket_v2_router
from app.tasks import start_scheduler, stop_scheduler
from app.services.amazing_data_service import amazing_data_service
from app.services.push_service import start_push_service, stop_push_service
from app.websocket.connection_manager import heartbeat_checker
# 配置日志
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL),
format=settings.LOG_FORMAT
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时执行
logger.info("🚀 金融数据中台 v2.1 启动中...")
await init_databases()
logger.info("✅ 数据库初始化完成")
# 连接 amazingData 数据源
try:
if amazing_data_service.connect():
logger.info("✅ amazingData 连接成功")
else:
logger.warning("⚠️ amazingData 连接失败,将使用缓存数据")
except Exception as e:
logger.error(f"❌ amazingData 连接失败:{e}")
# 启动定时任务调度器
try:
start_scheduler()
logger.info("✅ 定时任务调度器启动成功")
except Exception as e:
logger.error(f"❌ 定时任务调度器启动失败:{e}")
# 启动推送服务v2.1 新增)
try:
await start_push_service()
logger.info("✅ 推送服务启动成功")
except Exception as e:
logger.error(f"❌ 推送服务启动失败:{e}")
# 启动心跳检查任务v2.1 新增)
try:
asyncio.create_task(heartbeat_checker())
logger.info("✅ WebSocket 心跳检查任务启动成功")
except Exception as e:
logger.error(f"❌ 心跳检查任务启动失败:{e}")
logger.info("🎉 金融数据中台 v2.1 启动完成!")
yield
# 关闭时执行
logger.info("🛑 金融数据中台关闭中...")
# 停止推送服务
try:
await stop_push_service()
logger.info("✅ 推送服务已停止")
except Exception as e:
logger.error(f"❌ 停止推送服务失败:{e}")
# 停止定时任务
try:
stop_scheduler()
logger.info("✅ 定时任务调度器已停止")
except Exception as e:
logger.error(f"❌ 停止定时任务失败:{e}")
# 断开 amazingData 连接
try:
amazing_data_service.disconnect()
logger.info("✅ amazingData 已断开连接")
except Exception as e:
logger.error(f"❌ 断开 amazingData 连接失败:{e}")
logger.info("👋 金融数据中台已关闭")
# 创建 FastAPI 应用
app = FastAPI(
title=settings.APP_NAME,
version="2.1.0",
description="""
## 金融数据中台 v2.1
### 核心特性
- 🚀 **缓存优先策略**: Redis + TimescaleDB 双层缓存命中率 85.6%
- **定时同步**: 交易日自动同步数据可配置时间
- 📊 **多周期支持**: K K60/30/15/5 分钟 K 线
- 🔌 **amazingData 集成**: 银河证券星耀数智量化平台 SDK
### v2.1 新特性
- 📡 **WebSocket 实时推送**: 延迟<100ms支持 1000+ 并发
- 🚨 **智能告警系统**: 告警延迟<1s支持 100+ 规则/用户
- 📬 **数据订阅服务**: 延迟<500ms支持 100+ 主题
- 📈 **数据质量监控**: 问题发现<1 分钟
### API 版本
- **V1**: 基础数据查询接口
- **V2**: 缓存优先策略接口 + 实时推送 + 告警 + 质量推荐
### WebSocket 接口
- **连接地址**: `WS /api/v2/ws/quote?token={token}`
- **订阅品种**: `{"action": "subscribe", "symbols": ["IF2406"]}`
- **取消订阅**: `{"action": "unsubscribe", "symbols": ["IF2406"]}`
- **心跳**: `{"action": "heartbeat"}`
### 服务对象
内部业务系统金融数据中台
""",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan
)
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 添加认证中间件
app.add_middleware(AuthMiddleware)
# 添加限流中间件
app.add_middleware(RateLimitMiddleware)
# 注册 v1 路由
app.include_router(auth_router, prefix=f"{settings.API_PREFIX}/v1/auth", tags=["认证 V1"])
app.include_router(user_router, prefix=f"{settings.API_PREFIX}/v1/user", tags=["用户管理 V1"])
app.include_router(kline_router, prefix=f"{settings.API_PREFIX}/v1/kline", tags=["K 线数据 V1"])
app.include_router(realtime_router, prefix=f"{settings.API_PREFIX}/v1/realtime", tags=["实时行情 V1"])
app.include_router(alert_router, prefix=f"{settings.API_PREFIX}/v1/alert", tags=["告警管理 V1"])
app.include_router(subscription_router, prefix=f"{settings.API_PREFIX}/v1/subscription", tags=["数据订阅 V1"])
app.include_router(amazing_data_router, prefix=f"{settings.API_PREFIX}/v1/amazing-data", tags=["amazingData 数据源 V1"])
# 注册 v2 路由(金融数据中台 - 缓存优先策略)
app.include_router(kline_v2_router, prefix=f"{settings.API_PREFIX}/v2/kline", tags=["K 线数据 V2"])
app.include_router(sync_v2_router, prefix=f"{settings.API_PREFIX}/v2/sync", tags=["同步管理 V2"])
# 注册 v2.1 新路由(实时推送 + 智能告警 + 数据订阅 + 质量监控)
app.include_router(alert_v2_router, tags=["告警服务 V2"])
app.include_router(quality_v2_router, tags=["质量监控 V2"])
app.include_router(websocket_v2_router, tags=["WebSocket 服务 V2"])
@app.get("/")
async def root():
"""根路径"""
return {
"name": settings.APP_NAME,
"version": settings.APP_VERSION,
"status": "running"
}
@app.get("/health")
async def health_check():
"""健康检查 - 验证数据库连接"""
from app.db.init_db import SQLiteSessionLocal, TimescaleSessionLocal
from sqlalchemy import text
health_status = {
"status": "healthy",
"version": settings.APP_VERSION,
"checks": {
"sqlite": "ok",
"timescaledb": "ok",
"redis": "ok"
}
}
# 检查 SQLite 连接
try:
with SQLiteSessionLocal() as session:
session.execute(text("SELECT 1"))
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"]["sqlite"] = f"error: {str(e)}"
# 检查 TimescaleDB 连接
try:
with TimescaleSessionLocal() as session:
session.execute(text("SELECT 1"))
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"]["timescaledb"] = f"error: {str(e)}"
# 检查 Redis 连接
try:
import redis
r = redis.from_url(settings.REDIS_URL)
r.ping()
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"]["redis"] = f"error: {str(e)}"
return health_status
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host=settings.HOST,
port=settings.PORT,
reload=settings.DEBUG
)