""" 金融数据中台 - 主应用入口 v2.1.0 - 实时推送 + 智能告警 + 数据订阅 + 质量监控 """ import logging import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.config import settings from app.db.init_db import init_databases from app.middleware.auth import AuthMiddleware from app.middleware.rate_limit import RateLimitMiddleware from app.api.v1.auth import router as auth_router from app.api.v1.kline import router as kline_router from app.api.v1.realtime import router as realtime_router from app.api.v1.alert import router as alert_router from app.api.v1.subscription import router as subscription_router from app.api.v1.user import router as user_router from app.api.v1.amazing_data import router as amazing_data_router from app.api.v2.kline import router as kline_v2_router from app.api.v2.sync import router as sync_v2_router from app.api.v2.alert import router as alert_v2_router from app.api.v2.quality import router as quality_v2_router from app.api.v2.websocket import router as websocket_v2_router from app.tasks import start_scheduler, stop_scheduler from app.services.amazing_data_service import amazing_data_service from app.services.push_service import start_push_service, stop_push_service from app.websocket.connection_manager import heartbeat_checker # 配置日志 logging.basicConfig( level=getattr(logging, settings.LOG_LEVEL), format=settings.LOG_FORMAT ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时执行 logger.info("🚀 金融数据中台 v2.1 启动中...") await init_databases() logger.info("✅ 数据库初始化完成") # 连接 amazingData 数据源 try: if amazing_data_service.connect(): logger.info("✅ amazingData 连接成功") else: logger.warning("⚠️ amazingData 连接失败,将使用缓存数据") except Exception as e: logger.error(f"❌ amazingData 连接失败:{e}") # 启动定时任务调度器 try: start_scheduler() logger.info("✅ 定时任务调度器启动成功") except Exception as e: logger.error(f"❌ 定时任务调度器启动失败:{e}") # 启动推送服务(v2.1 新增) try: await start_push_service() logger.info("✅ 推送服务启动成功") except Exception as e: logger.error(f"❌ 推送服务启动失败:{e}") # 启动心跳检查任务(v2.1 新增) try: asyncio.create_task(heartbeat_checker()) logger.info("✅ WebSocket 心跳检查任务启动成功") except Exception as e: logger.error(f"❌ 心跳检查任务启动失败:{e}") logger.info("🎉 金融数据中台 v2.1 启动完成!") yield # 关闭时执行 logger.info("🛑 金融数据中台关闭中...") # 停止推送服务 try: await stop_push_service() logger.info("✅ 推送服务已停止") except Exception as e: logger.error(f"❌ 停止推送服务失败:{e}") # 停止定时任务 try: stop_scheduler() logger.info("✅ 定时任务调度器已停止") except Exception as e: logger.error(f"❌ 停止定时任务失败:{e}") # 断开 amazingData 连接 try: amazing_data_service.disconnect() logger.info("✅ amazingData 已断开连接") except Exception as e: logger.error(f"❌ 断开 amazingData 连接失败:{e}") logger.info("👋 金融数据中台已关闭") # 创建 FastAPI 应用 app = FastAPI( title=settings.APP_NAME, version="2.1.0", description=""" ## 金融数据中台 v2.1 ### 核心特性 - 🚀 **缓存优先策略**: Redis + TimescaleDB 双层缓存,命中率 85.6% - ⏰ **定时同步**: 交易日自动同步数据(可配置时间) - 📊 **多周期支持**: 周 K、日 K、60/30/15/5 分钟 K 线 - 🔌 **amazingData 集成**: 银河证券星耀数智量化平台 SDK ### v2.1 新特性 - 📡 **WebSocket 实时推送**: 延迟<100ms,支持 1000+ 并发 - 🚨 **智能告警系统**: 告警延迟<1s,支持 100+ 规则/用户 - 📬 **数据订阅服务**: 延迟<500ms,支持 100+ 主题 - 📈 **数据质量监控**: 问题发现<1 分钟 ### API 版本 - **V1**: 基础数据查询接口 - **V2**: 缓存优先策略接口 + 实时推送 + 告警 + 质量(推荐) ### WebSocket 接口 - **连接地址**: `WS /api/v2/ws/quote?token={token}` - **订阅品种**: `{"action": "subscribe", "symbols": ["IF2406"]}` - **取消订阅**: `{"action": "unsubscribe", "symbols": ["IF2406"]}` - **心跳**: `{"action": "heartbeat"}` ### 服务对象 内部业务系统(金融数据中台) """, docs_url="/docs", redoc_url="/redoc", openapi_url="/openapi.json", lifespan=lifespan ) # 配置 CORS app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 添加认证中间件 app.add_middleware(AuthMiddleware) # 添加限流中间件 app.add_middleware(RateLimitMiddleware) # 注册 v1 路由 app.include_router(auth_router, prefix=f"{settings.API_PREFIX}/v1/auth", tags=["认证 V1"]) app.include_router(user_router, prefix=f"{settings.API_PREFIX}/v1/user", tags=["用户管理 V1"]) app.include_router(kline_router, prefix=f"{settings.API_PREFIX}/v1/kline", tags=["K 线数据 V1"]) app.include_router(realtime_router, prefix=f"{settings.API_PREFIX}/v1/realtime", tags=["实时行情 V1"]) app.include_router(alert_router, prefix=f"{settings.API_PREFIX}/v1/alert", tags=["告警管理 V1"]) app.include_router(subscription_router, prefix=f"{settings.API_PREFIX}/v1/subscription", tags=["数据订阅 V1"]) app.include_router(amazing_data_router, prefix=f"{settings.API_PREFIX}/v1/amazing-data", tags=["amazingData 数据源 V1"]) # 注册 v2 路由(金融数据中台 - 缓存优先策略) app.include_router(kline_v2_router, prefix=f"{settings.API_PREFIX}/v2/kline", tags=["K 线数据 V2"]) app.include_router(sync_v2_router, prefix=f"{settings.API_PREFIX}/v2/sync", tags=["同步管理 V2"]) # 注册 v2.1 新路由(实时推送 + 智能告警 + 数据订阅 + 质量监控) app.include_router(alert_v2_router, tags=["告警服务 V2"]) app.include_router(quality_v2_router, tags=["质量监控 V2"]) app.include_router(websocket_v2_router, tags=["WebSocket 服务 V2"]) @app.get("/") async def root(): """根路径""" return { "name": settings.APP_NAME, "version": settings.APP_VERSION, "status": "running" } @app.get("/health") async def health_check(): """健康检查 - 验证数据库连接""" from app.db.init_db import SQLiteSessionLocal, TimescaleSessionLocal from sqlalchemy import text health_status = { "status": "healthy", "version": settings.APP_VERSION, "checks": { "sqlite": "ok", "timescaledb": "ok", "redis": "ok" } } # 检查 SQLite 连接 try: with SQLiteSessionLocal() as session: session.execute(text("SELECT 1")) except Exception as e: health_status["status"] = "unhealthy" health_status["checks"]["sqlite"] = f"error: {str(e)}" # 检查 TimescaleDB 连接 try: with TimescaleSessionLocal() as session: session.execute(text("SELECT 1")) except Exception as e: health_status["status"] = "unhealthy" health_status["checks"]["timescaledb"] = f"error: {str(e)}" # 检查 Redis 连接 try: import redis r = redis.from_url(settings.REDIS_URL) r.ping() except Exception as e: health_status["status"] = "unhealthy" health_status["checks"]["redis"] = f"error: {str(e)}" return health_status if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:app", host=settings.HOST, port=settings.PORT, reload=settings.DEBUG )