You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

526 lines
12 KiB

# 金融数据中台 v2.1 - 开发任务清单
**项目**: 20260330_kline_system
**版本**: v2.1.0
**创建日期**: 2026-04-03
**预计完成**: 2026-04-15
**当前状态**: 规划中
---
## 📋 任务总览
| 阶段 | 任务数 | 已完成 | 进度 |
|------|--------|--------|------|
| 后端开发 | 9 | 0 | 0% |
| 前端开发 | 3 | 0 | 0% |
| 测试 | 5 | 0 | 0% |
| 文档 | 2 | 0 | 0% |
| **总计** | **19** | **0** | **0%** |
---
## 🔴 P0 - 核心功能13 天)
### 1. WebSocket 实时推送3 天)
#### 1.1 WebSocket 连接管理2 天)
**文件**: `backend/app/websocket/quote_pusher.py`
**任务详情**:
- [ ] 创建 WebSocket 路由 `/ws/v1/quote`
- [ ] 实现认证中间件Bearer Token
- [ ] 实现心跳机制30 秒间隔)
- [ ] 实现连接管理(存储连接、断开清理)
- [ ] 实现重连逻辑(指数退避)
- [ ] 编写单元测试
**技术要点**:
```python
# FastAPI WebSocket
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/v1/quote")
async def websocket_endpoint(websocket: WebSocket, token: str):
# 认证
user = await authenticate(token)
if not user:
await websocket.close(code=4001)
return
await websocket.accept()
# 连接管理
connection_manager.add_connection(user.id, websocket)
try:
while True:
data = await websocket.receive_json()
await handle_message(user.id, data)
except WebSocketDisconnect:
connection_manager.remove_connection(user.id)
```
**验收标准**:
- [ ] 支持 1000+ 并发连接
- [ ] 握手延迟 <100ms
- [ ] 心跳正常90 秒无心跳自动断开
- [ ] 认证失败正确返回错误码
---
#### 1.2 WebSocket 推送服务1 天)
**文件**: `backend/app/services/push_service.py`
**任务详情**:
- [ ] 实现订阅管理subscribe/unsubscribe
- [ ] 实现行情推送(从 Redis Pub/Sub 接收)
- [ ] 实现 K 线推送
- [ ] 实现系统消息推送
- [ ] 编写单元测试
**数据格式**:
```json
{
"type": "quote",
"symbol": "IF2406",
"data": {
"time": "2026-04-03T10:30:00.123Z",
"price": 3850.5,
"change": 0.5,
"change_percent": 0.13
}
}
```
**验收标准**:
- [ ] 推送延迟 <50ms
- [ ] 消息丢失率 <0.01%
- [ ] 支持 JSON 格式推送
---
### 2. 智能告警系统3 天)
#### 2.1 告警规则引擎2 天)
**文件**: `backend/app/services/alert_engine.py`
**任务详情**:
- [ ] 设计规则数据结构
- [ ] 实现规则解析器
- [ ] 实现规则计算器(实时数据 + 规则 → 布尔值)
- [ ] 实现规则调度(每分钟执行)
- [ ] 实现规则缓存
- [ ] 编写单元测试
**规则示例**:
```python
class AlertRule:
id: int
user_id: int
name: str
symbol: str
type: str # price, change_percent, technical, volume
condition: str # "price > 3900"
channels: List[str] # ["站内消息", "邮件"]
enabled: bool
start_time: time
end_time: time
repeat_interval: int # 秒
```
**验收标准**:
- [ ] 支持 100+ 规则/用户
- [ ] 规则计算延迟 <100ms
- [ ] 支持并发计算 >1000 规则/秒
---
#### 2.2 告警通知服务1 天)
**文件**: `backend/app/services/alert_notification.py`
**任务详情**:
- [ ] 实现站内消息通知
- [ ] 实现邮件通知SMTP
- [ ] 实现企业微信通知Webhook
- [ ] 实现钉钉通知Webhook
- [ ] 实现短信通知(阿里云 SMS
- [ ] 实现通知去重repeat_interval
- [ ] 编写单元测试
**通知模板**:
```
【金融数据中台】告警通知
告警名称IF2406 价格突破
触发时间2026-04-03 10:30:00
触发条件:价格 > 3900
当前价格3901.5
查看详情https://your-system.com/alerts/12345
```
**验收标准**:
- [ ] 通知延迟 <1s
- [ ] 通知到达率 >99%
- [ ] 支持 5 种通知渠道
---
### 3. 数据订阅服务2 天)
#### 3.1 Redis Stream 服务2 天)
**文件**: `backend/app/services/subscription_service.py`
**任务详情**:
- [ ] 设计订阅主题格式
- [ ] 实现主题管理(创建/删除)
- [ ] 实现消息发布publish
- [ ] 实现消息消费XREADGROUP
- [ ] 实现消费者组管理
- [ ] 实现消息确认XACK
- [ ] 实现消息清理24 小时)
- [ ] 编写单元测试
**主题格式**:
```
kline.update.{symbol}.{period}
quote.update.{symbol}
sync.complete
alert.trigger.{alert_id}
data.quality.issue
```
**验收标准**:
- [ ] 订阅延迟 <500ms
- [ ] 支持 100+ 主题
- [ ] 支持 10+ 消费者组
- [ ] 消息保留 24 小时
---
### 4. 数据质量监控2 天)
#### 4.1 质量监控服务2 天)
**文件**: `backend/app/services/quality_monitor.py`
**任务详情**:
- [ ] 设计监控指标(完整性、准确性、及时性、一致性)
- [ ] 实现完整性检测(数据缺失)
- [ ] 实现准确性检测(价格异常)
- [ ] 实现及时性检测(数据延迟)
- [ ] 实现一致性检测(缓存 vs 数据库)
- [ ] 实现质量评分计算
- [ ] 实现告警触发
- [ ] 编写单元测试
**监控指标**:
```python
completeness_score = (1 - missing_periods / total_periods) * 100
accuracy_score = (1 - abnormal_records / total_records) * 100
timeliness_score = (1 - delayed_periods / total_periods) * 100
consistency_score = (1 - inconsistent_records / total_records) * 100
overall_score = (completeness + accuracy + timeliness + consistency) / 4
```
**验收标准**:
- [ ] 问题发现 <1 分钟
- [ ] 质量评分准确
- [ ] 支持自定义监控规则
---
### 5. API 接口实现2 天)
#### 5.1 WebSocket API0.5 天)
**文件**: `backend/app/api/v2/websocket.py`
**任务详情**:
- [ ] 实现连接状态查询
- [ ] 实现订阅管理 API
- [ ] 实现推送统计 API
---
#### 5.2 告警 API0.5 天)
**文件**: `backend/app/api/v2/alert.py`
**任务详情**:
- [ ] POST /api/v2/alert/rules - 创建告警
- [ ] GET /api/v2/alert/rules - 查询告警列表
- [ ] PUT /api/v2/alert/rules/{id} - 更新告警
- [ ] DELETE /api/v2/alert/rules/{id} - 删除告警
- [ ] POST /api/v2/alert/rules/{id}/enable - 启用告警
- [ ] POST /api/v2/alert/rules/{id}/disable - 禁用告警
- [ ] GET /api/v2/alert/history - 查询告警历史
---
#### 5.3 订阅 API0.5 天)
**文件**: `backend/app/api/v2/subscription.py`
**任务详情**:
- [ ] POST /api/v2/subscription - 创建订阅
- [ ] GET /api/v2/subscription/{id} - 查询订阅
- [ ] PUT /api/v2/subscription/{id} - 更新订阅
- [ ] DELETE /api/v2/subscription/{id} - 取消订阅
---
#### 5.4 质量 API0.5 天)
**文件**: `backend/app/api/v2/quality.py`
**任务详情**:
- [ ] GET /api/v2/quality/score - 查询质量评分
- [ ] GET /api/v2/quality/issues - 查询问题列表
- [ ] POST /api/v2/quality/rules - 创建监控规则
- [ ] PUT /api/v2/quality/rules/{id} - 更新监控规则
- [ ] GET /api/v2/quality/history - 查询监控历史
---
### 6. 数据库迁移0.5 天)
#### 6.1 创建新表0.5 天)
**文件**: `backend/app/db/migrations_v2_1.py`
**任务详情**:
- [ ] 创建 alert_rule 表
- [ ] 创建 alert_history 表
- [ ] 创建 subscription 表
- [ ] 创建 quality_rule 表
- [ ] 创建 quality_log 表
- [ ] 编写迁移脚本
- [ ] 编写回滚脚本
**验收标准**:
- [ ] 迁移脚本可执行
- [ ] 回滚脚本可执行
- [ ] 数据不丢失
---
## 🟠 P1 - 前端开发5 天)
### 7. 告警管理页面2 天)
**文件**: `frontend/src/views/alert/`
**任务详情**:
- [ ] 告警列表页面
- [ ] 创建告警表单
- [ ] 编辑告警表单
- [ ] 告警历史页面
- [ ] 告警统计图表
**页面设计**:
```
告警管理
├── 告警列表(表格)
│ ├── 名称
│ ├── 品种
│ ├── 类型
│ ├── 条件
│ ├── 状态(启用/禁用)
│ └── 操作(编辑/删除/启用/禁用)
├── 创建告警(表单)
│ ├── 告警名称
│ ├── 品种选择
│ ├── 告警类型
│ ├── 触发条件
│ ├── 通知渠道
│ └── 生效时间
└── 告警历史(表格)
├── 触发时间
├── 告警名称
├── 触发值
└── 通知状态
```
---
### 8. 数据质量 Dashboard2 天)
**文件**: `frontend/src/views/quality/`
**任务详情**:
- [ ] 质量概览卡片4 个评分)
- [ ] 问题统计图表
- [ ] 数据源状态表格
- [ ] 质量趋势图表
- [ ] 监控规则管理
**页面设计**:
```
数据质量 Dashboard
├── 质量概览
│ ├── 完整性评分98.5%
│ ├── 准确性评分99.8%
│ ├── 及时性评分97.2%
│ └── 一致性评分100%
├── 问题统计24 小时)
│ ├── 严重问题0
│ ├── 警告问题3
│ └── 提示信息12
├── 质量趋势(折线图)
└── 数据源状态
├── amazingData: 正常
├── Tushare: 正常
└── 东方财富:延迟
```
---
### 9. WebSocket 测试工具1 天)
**文件**: `frontend/src/views/tools/websocket-tester.vue`
**任务详情**:
- [ ] 连接管理(连接/断开)
- [ ] 订阅管理(订阅/取消)
- [ ] 消息日志(接收/发送)
- [ ] 统计信息(延迟/丢包)
---
## 🟡 P2 - 测试5 天)
### 10. WebSocket 测试1 天)
**文件**: `tests/test_websocket.py`
**任务详情**:
- [ ] 连接测试(正常/异常)
- [ ] 认证测试(有效/无效 Token
- [ ] 订阅测试(订阅/取消)
- [ ] 推送测试(行情/K线
- [ ] 性能测试1000 并发)
- [ ] 稳定性测试24 小时)
---
### 11. 告警功能测试1 天)
**文件**: `tests/test_alert.py`
**任务详情**:
- [ ] 规则创建测试
- [ ] 规则计算测试
- [ ] 告警触发测试
- [ ] 通知发送测试
- [ ] 性能测试1000 规则)
---
### 12. 数据订阅测试1 天)
**文件**: `tests/test_subscription.py`
**任务详情**:
- [ ] 订阅创建测试
- [ ] 消息发布测试
- [ ] 消息消费测试
- [ ] 消费者组测试
- [ ] 性能测试100 主题)
---
### 13. 质量监控测试1 天)
**文件**: `tests/test_quality.py`
**任务详情**:
- [ ] 完整性检测测试
- [ ] 准确性检测测试
- [ ] 及时性检测测试
- [ ] 一致性检测测试
- [ ] 告警触发测试
---
### 14. 性能测试1 天)
**文件**: `tests/test_performance.py`
**任务详情**:
- [ ] WebSocket 压测Locust
- [ ] 告警引擎压测
- [ ] 订阅服务压测
- [ ] 系统稳定性测试
---
## 📝 P3 - 文档2 天)
### 15. API 文档更新1 天)
**文件**: `docs/api_v2.md`
**任务详情**:
- [ ] WebSocket API 文档
- [ ] 告警 API 文档
- [ ] 订阅 API 文档
- [ ] 质量 API 文档
- [ ] Swagger 注解更新
---
### 16. 用户手册更新1 天)
**文件**: `docs/user_guide.md`
**任务详情**:
- [ ] WebSocket 使用指南
- [ ] 告警配置指南
- [ ] 数据订阅指南
- [ ] 质量监控指南
- [ ] 常见问题 FAQ
---
## 📅 时间计划
### 第 1 周2026-04-04 ~ 2026-04-07
- [ ] 需求评审04-04
- [ ] 架构设计04-05
- [ ] WebSocket 开发04-06-04-07
### 第 2 周2026-04-08 ~ 2026-04-11
- [ ] 告警系统开发04-08-04-09
- [ ] 数据订阅开发04-10
- [ ] 质量监控开发04-11
- [ ] API 接口开发04-11
### 第 3 周2026-04-12 ~ 2026-04-15
- [ ] 前端开发04-12-04-13
- [ ] 测试04-13-04-14
- [ ] 文档04-14
- [ ] 验收上线04-15
---
## 📊 进度跟踪
| 日期 | 计划完成 | 实际完成 | 进度 | 备注 |
|------|----------|----------|------|------|
| 04-04 | 需求评审 | - | - | - |
| 04-05 | 架构设计 | - | - | - |
| 04-06 | WebSocket 连接 | - | - | - |
| 04-07 | WebSocket 推送 | - | - | - |
| 04-08 | 告警规则引擎 | - | - | - |
| 04-09 | 告警通知服务 | - | - | - |
| 04-10 | 数据订阅服务 | - | - | - |
| 04-11 | 质量监控服务 | - | - | - |
| 04-12 | 前端开发 | - | - | - |
| 04-13 | 测试 | - | - | - |
| 04-14 | 验收 | - | - | - |
| 04-15 | 上线 | - | - | - |
---
**创建人**: Agent Coordinator
**创建日期**: 2026-04-03
**状态**: 规划中
**下一步**: 分配任务 → 开发实施 → 进度跟踪