You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
6.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# backend/app/api/v2/websocket.py
"""
WebSocket API 接口
支持连接状态查询、订阅管理、推送统计
"""
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException
from sqlalchemy.orm import Session
from app.db.base import get_db
from app.middleware.auth import get_current_user
from app.models.user import User
from app.websocket.connection_manager import connection_manager, websocket_handler
from app.services.push_service import push_service
from app.services.auth_service import verify_token
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v2/ws", tags=["WebSocket 服务"])
# ============== WebSocket 连接路由 ==============
@router.websocket("/quote")
async def websocket_quote_endpoint(
websocket: WebSocket,
token: str = Query(..., description="认证 Token"),
db: Session = Depends(get_db)
):
"""
WebSocket 行情推送连接
连接地址: WS /api/v2/ws/quote?token={token}
客户端操作:
- subscribe: 订阅品种 {"action": "subscribe", "symbols": ["IF2406"]}
- unsubscribe: 取消订阅 {"action": "unsubscribe", "symbols": ["IF2406"]}
- heartbeat: 心跳 {"action": "heartbeat"}
- query: 查询订阅 {"action": "query"}
服务端推送:
- quote: 行情推送 {"type": "quote", "symbol": "IF2406", "data": {...}}
- kline: K 线推送 {"type": "kline", "symbol": "IF2406", "period": "1m", "data": {...}}
- system: 系统消息 {"type": "system", "event": "connected", ...}
"""
# 认证
user = await verify_token(token)
if not user:
await websocket.close(code=4001, reason="认证失败")
return
# 处理 WebSocket 消息
await websocket_handler(websocket, user.id)
# ============== HTTP APIWebSocket 管理) ==============
@router.get("/connections", summary="查询连接统计")
async def get_connection_statistics(
current_user: User = Depends(get_current_user)
):
"""
查询 WebSocket 连接统计
包括连接数、用户数、订阅数、消息数等
"""
try:
stats = connection_manager.get_statistics()
return {
"connections": stats,
"push_service": push_service.get_statistics()
}
except Exception as e:
logger.error(f"❌ 查询连接统计失败: {e}")
raise HTTPException(status_code=500, detail=f"查询连接统计失败: {str(e)}")
@router.get("/user/{user_id}/subscriptions", summary="查询用户订阅")
async def get_user_subscriptions(
user_id: int,
current_user: User = Depends(get_current_user)
):
"""
查询用户的 WebSocket 订阅列表
仅管理员或用户本人可查询
"""
try:
# 权限检查
if current_user.id != user_id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权限查询其他用户的订阅")
subscriptions = connection_manager.get_user_subscriptions(user_id)
return {
"user_id": user_id,
"subscriptions": subscriptions,
"count": len(subscriptions)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 查询用户订阅失败: {e}")
raise HTTPException(status_code=500, detail=f"查询用户订阅失败: {str(e)}")
@router.get("/symbol/{symbol}/subscribers", summary="查询品种订阅用户")
async def get_symbol_subscribers(
symbol: str,
current_user: User = Depends(get_current_user)
):
"""
查询订阅某品种的用户列表
仅管理员可查询
"""
try:
# 权限检查
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权限查询品种订阅用户")
stats = connection_manager.get_statistics()
symbol_subscribers = stats.get("symbol_subscribers", {})
user_count = symbol_subscribers.get(symbol, 0)
return {
"symbol": symbol,
"subscriber_count": user_count
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 查询品种订阅用户失败: {e}")
raise HTTPException(status_code=500, detail=f"查询品种订阅用户失败: {str(e)}")
@router.post("/broadcast", summary="广播系统消息")
async def broadcast_system_message(
message: dict,
current_user: User = Depends(get_current_user)
):
"""
广播系统消息
仅管理员可操作
"""
try:
# 权限检查
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权限广播消息")
# 广播消息
await connection_manager.broadcast({
"type": "system",
"event": "broadcast",
"message": message,
"time": datetime.now().isoformat()
})
logger.info(f"✅ 管理员 {current_user.id} 广播消息: {message}")
return {"status": "success", "message": "消息已广播"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 广播消息失败: {e}")
raise HTTPException(status_code=500, detail=f"广播消息失败: {str(e)}")
@router.post("/publish/quote", summary="发布行情更新")
async def publish_quote_update(
symbol: str = Query(..., description="品种代码"),
quote_data: dict = None,
current_user: User = Depends(get_current_user)
):
"""
发布行情更新(手动触发)
仅管理员可操作
"""
try:
# 权限检查
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权限发布行情")
# 发布行情
await push_service.publish_quote(symbol, quote_data)
logger.info(f"✅ 管理员 {current_user.id} 发布行情: {symbol}")
return {"status": "success", "message": f"行情已发布: {symbol}"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ 发布行情失败: {e}")
raise HTTPException(status_code=500, detail=f"发布行情失败: {str(e)}")
# ============== 导入依赖 ==============
from datetime import datetime