You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
23 KiB
23 KiB
A股智投分析平台 - 后端实现文档
一、技术选型
1.1 推荐方案: Node.js + Express
技术栈:
- Node.js 20.x LTS
- Express 4.x
- TypeScript 5.x
- Prisma ORM
- Socket.io (WebSocket)
- Redis (缓存)
- MySQL 8.0 (数据库)
1.2 备选方案: Python + FastAPI
技术栈:
- Python 3.11
- FastAPI
- SQLAlchemy
- WebSockets
- Celery (定时任务)
- Redis
- PostgreSQL
二、项目结构
backend/
├── src/
│ ├── config/ # 配置文件
│ │ ├── database.ts # 数据库配置
│ │ ├── redis.ts # Redis配置
│ │ └── constants.ts # 常量定义
│ │
│ ├── controllers/ # 控制器层
│ │ ├── marketController.ts
│ │ ├── sectorController.ts
│ │ ├── stockController.ts
│ │ └── userController.ts
│ │
│ ├── services/ # 业务逻辑层
│ │ ├── marketService.ts
│ │ ├── sectorService.ts
│ │ ├── stockService.ts
│ │ ├── dataSyncService.ts
│ │ └── calculationService.ts
│ │
│ ├── models/ # 数据模型层
│ │ ├── Stock.ts
│ │ ├── Sector.ts
│ │ ├── KLine.ts
│ │ └── User.ts
│ │
│ ├── routes/ # 路由定义
│ │ ├── marketRoutes.ts
│ │ ├── sectorRoutes.ts
│ │ ├── stockRoutes.ts
│ │ └── userRoutes.ts
│ │
│ ├── middleware/ # 中间件
│ │ ├── auth.ts # 认证中间件
│ │ ├── errorHandler.ts # 错误处理
│ │ ├── rateLimiter.ts # 限流
│ │ └── logger.ts # 日志
│ │
│ ├── utils/ # 工具函数
│ │ ├── logger.ts
│ │ ├── validator.ts
│ │ ├── formatter.ts
│ │ └── maCalculator.ts # 均线计算
│ │
│ ├── websocket/ # WebSocket服务
│ │ └── stockSocket.ts
│ │
│ ├── jobs/ # 定时任务
│ │ ├── syncMarketData.ts
│ │ ├── calculateMomentum.ts
│ │ └── updateRankings.ts
│ │
│ ├── types/ # 类型定义
│ │ └── index.ts
│ │
│ └── app.ts # 应用入口
│
├── prisma/ # Prisma ORM
│ └── schema.prisma
│
├── tests/ # 测试文件
│ ├── unit/
│ └── integration/
│
├── scripts/ # 脚本文件
│ └── init-db.ts
│
├── .env # 环境变量
├── .env.example # 环境变量示例
├── Dockerfile
├── docker-compose.yml
├── package.json
├── tsconfig.json
└── README.md
三、核心服务实现
3.1 市场数据服务
文件: src/services/marketService.ts
import { PrismaClient } from '@prisma/client';
import Redis from 'ioredis';
const prisma = new PrismaClient();
const redis = new Redis(process.env.REDIS_URL);
export class MarketService {
// 获取市场指数
async getMarketIndices() {
const cacheKey = 'market:indices';
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const indices = await prisma.marketIndex.findMany({
orderBy: { sortOrder: 'asc' }
});
await redis.setex(cacheKey, 60, JSON.stringify(indices));
return indices;
}
// 获取涨跌家数统计
async getUpDownStats() {
const cacheKey = 'market:updown:stats';
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const stats = await prisma.stockQuote.groupBy({
by: ['changePercent'],
_count: { code: true },
where: {
quoteTime: {
gte: new Date(Date.now() - 5 * 60 * 1000) // 5分钟内
}
}
});
const result = {
up: stats.filter(s => s.changePercent > 0).reduce((a, b) => a + b._count.code, 0),
down: stats.filter(s => s.changePercent < 0).reduce((a, b) => a + b._count.code, 0),
flat: stats.filter(s => s.changePercent === 0).reduce((a, b) => a + b._count.code, 0)
};
await redis.setex(cacheKey, 60, JSON.stringify(result));
return result;
}
// 获取涨跌幅分布
async getPriceDistribution() {
const cacheKey = 'market:price:distribution';
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const ranges = [
{ range: '<-7%', min: -100, max: -7 },
{ range: '-7~-5%', min: -7, max: -5 },
{ range: '-5~-3%', min: -5, max: -3 },
{ range: '-3~0%', min: -3, max: 0 },
{ range: '0~3%', min: 0, max: 3 },
{ range: '3~5%', min: 3, max: 5 },
{ range: '5~7%', min: 5, max: 7 },
{ range: '>7%', min: 7, max: 100 }
];
const distribution = await Promise.all(
ranges.map(async r => {
const count = await prisma.stockQuote.count({
where: {
changePercent: {
gte: r.min,
lt: r.max
},
quoteTime: {
gte: new Date(Date.now() - 5 * 60 * 1000)
}
}
});
return { ...r, count };
})
);
await redis.setex(cacheKey, 60, JSON.stringify(distribution));
return distribution;
}
}
3.2 版块数据服务
文件: src/services/sectorService.ts
export class SectorService {
// 获取版块列表(带动量排名)
async getSectorsWithMomentum() {
const cacheKey = 'sectors:momentum';
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const sectors = await prisma.sector.findMany({
include: {
quotes: {
orderBy: { quoteTime: 'desc' },
take: 1
}
}
});
// 计算动量分数和排名
const sectorsWithMomentum = sectors.map(sector => {
const latestQuote = sector.quotes[0];
const momentumScore = this.calculateMomentumScore(sector);
return {
...sector,
changePercent: latestQuote?.changePercent || 0,
momentumScore,
rank: 0 // 稍后计算
};
});
// 按动量分数排序
sectorsWithMomentum.sort((a, b) => b.momentumScore - a.momentumScore);
// 分配排名
sectorsWithMomentum.forEach((sector, index) => {
sector.rank = index + 1;
});
await redis.setex(cacheKey, 60, JSON.stringify(sectorsWithMomentum));
return sectorsWithMomentum;
}
// 计算动量分数
private calculateMomentumScore(sector: any): number {
// 基于涨跌幅、成交量、趋势等因素计算
const latestQuote = sector.quotes[0];
if (!latestQuote) return 50;
let score = 50;
// 涨跌幅贡献 (0-30分)
score += Math.min(Math.max(latestQuote.changePercent * 3, -15), 15);
// 成交量贡献 (0-20分)
const volumeRatio = latestQuote.volume / (latestQuote.avgVolume || latestQuote.volume);
score += Math.min((volumeRatio - 1) * 10, 20);
return Math.min(Math.max(score, 0), 100);
}
// 获取版块历史排名
async getSectorRankHistory(sectorCode: string, days: number = 30) {
const cacheKey = `sector:${sectorCode}:rank:history:${days}`;
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const history = await prisma.sectorQuote.findMany({
where: {
sectorCode,
quoteTime: {
gte: new Date(Date.now() - days * 24 * 60 * 60 * 1000)
}
},
orderBy: { quoteTime: 'asc' },
select: {
quoteTime: true,
rank: true,
momentumScore: true
}
});
const result = history.map(h => ({
date: h.quoteTime.toISOString().split('T')[0],
rank: h.rank,
momentumScore: h.momentumScore
}));
await redis.setex(cacheKey, 300, JSON.stringify(result));
return result;
}
// 获取版块内动量股票
async getSectorMomentumStocks(sectorCode: string) {
const stocks = await prisma.stock.findMany({
where: { sectorCode },
include: {
quotes: {
orderBy: { quoteTime: 'desc' },
take: 1
}
}
});
return stocks
.map(stock => ({
...stock,
...stock.quotes[0],
momentumScore: this.calculateStockMomentum(stock)
}))
.sort((a, b) => b.momentumScore - a.momentumScore);
}
private calculateStockMomentum(stock: any): number {
const quote = stock.quotes[0];
if (!quote) return 50;
let score = 50;
score += Math.min(Math.max(quote.changePercent * 4, -20), 20);
const volumeRatio = quote.volume / (quote.avgVolume || quote.volume);
score += Math.min((volumeRatio - 1) * 15, 25);
return Math.min(Math.max(score, 0), 100);
}
}
3.3 股票数据服务
文件: src/services/stockService.ts
export class StockService {
// 搜索股票
async searchStocks(keyword: string) {
const stocks = await prisma.stock.findMany({
where: {
OR: [
{ name: { contains: keyword } },
{ code: { contains: keyword } }
]
},
take: 10,
include: {
quotes: {
orderBy: { quoteTime: 'desc' },
take: 1
}
}
});
return stocks.map(s => ({
...s,
...s.quotes[0]
}));
}
// 获取个股详情
async getStockDetail(code: string) {
const cacheKey = `stock:${code}:detail`;
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const stock = await prisma.stock.findUnique({
where: { code },
include: {
quotes: {
orderBy: { quoteTime: 'desc' },
take: 1
}
}
});
if (!stock) return null;
const klines = await prisma.kLineData.findMany({
where: { stockCode: code, period: 'day' },
orderBy: { date: 'desc' },
take: 60
});
// 计算技术指标
const indicators = this.calculateIndicators(klines);
const result = {
...stock,
...stock.quotes[0],
...indicators
};
await redis.setex(cacheKey, 60, JSON.stringify(result));
return result;
}
// 获取K线数据
async getKLineData(code: string, period: string = 'day', days: number = 60) {
const cacheKey = `stock:${code}:kline:${period}:${days}`;
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const klines = await prisma.kLineData.findMany({
where: {
stockCode: code,
period
},
orderBy: { date: 'desc' },
take: days
});
// 计算均线
const klinesWithMA = this.calculateMA(klines.reverse());
await redis.setex(cacheKey, 300, JSON.stringify(klinesWithMA));
return klinesWithMA;
}
// 计算均线
private calculateMA(klines: any[]) {
const periods = [5, 10, 20, 30, 60];
return klines.map((kline, index) => {
const ma: Record<string, number> = {};
for (const period of periods) {
if (index >= period - 1) {
const sum = klines
.slice(index - period + 1, index + 1)
.reduce((acc, k) => acc + k.close, 0);
ma[`ma${period}`] = Number((sum / period).toFixed(2));
}
}
return { ...kline, ...ma };
});
}
// 计算技术指标
private calculateIndicators(klines: any[]) {
return {
macd: this.calculateMACD(klines),
kdj: this.calculateKDJ(klines),
rsi: this.calculateRSI(klines)
};
}
// MACD计算
private calculateMACD(klines: any[]) {
const closes = klines.map(k => k.close).reverse();
const ema12 = this.EMA(closes, 12);
const ema26 = this.EMA(closes, 26);
const dif = ema12.map((v, i) => v - ema26[i]);
const dea = this.EMA(dif, 9);
const macd = dif.map((v, i) => (v - dea[i]) * 2);
return {
dif: Number(dif[dif.length - 1].toFixed(3)),
dea: Number(dea[dea.length - 1].toFixed(3)),
macd: Number(macd[macd.length - 1].toFixed(3))
};
}
// KDJ计算
private calculateKDJ(klines: any[], n: number = 9) {
// KDJ计算逻辑
// ...
return { k: 75.2, d: 68.5, j: 88.6 };
}
// RSI计算
private calculateRSI(klines: any[]) {
// RSI计算逻辑
// ...
return { rsi6: 72.5, rsi12: 68.3, rsi24: 65.1 };
}
// EMA计算
private EMA(data: number[], n: number): number[] {
const k = 2 / (n + 1);
const ema: number[] = [data[0]];
for (let i = 1; i < data.length; i++) {
ema.push(data[i] * k + ema[i - 1] * (1 - k));
}
return ema;
}
}
3.4 数据同步服务
文件: src/services/dataSyncService.ts
import axios from 'axios';
export class DataSyncService {
private akshareBaseUrl = 'http://localhost:8000'; // AKShare服务地址
// 同步实时行情
async syncRealTimeQuotes() {
try {
// 从AKShare获取实时行情
const response = await axios.get(`${this.akshareBaseUrl}/stock_zh_a_spot`);
const quotes = response.data;
// 批量插入数据库
await prisma.$transaction(
quotes.map((quote: any) =>
prisma.stockQuote.create({
data: {
stockCode: quote.code,
price: quote.price,
open: quote.open,
high: quote.high,
low: quote.low,
preClose: quote.pre_close,
volume: quote.volume,
turnover: quote.turnover,
changePercent: quote.change_percent,
quoteTime: new Date()
}
})
)
);
console.log(`Synced ${quotes.length} quotes`);
} catch (error) {
console.error('Sync quotes failed:', error);
}
}
// 同步K线数据
async syncKLineData(stockCode: string, period: string = 'day') {
try {
const response = await axios.get(
`${this.akshareBaseUrl}/stock_zh_a_hist`,
{
params: {
symbol: stockCode,
period: period === 'day' ? 'daily' : period,
start_date: '20230101',
end_date: new Date().toISOString().split('T')[0].replace(/-/g, '')
}
}
);
const klines = response.data;
await prisma.$transaction(
klines.map((k: any) =>
prisma.kLineData.upsert({
where: {
stockCode_period_date: {
stockCode: stockCode,
period: period,
date: new Date(k.date)
}
},
update: {
open: k.open,
high: k.high,
low: k.low,
close: k.close,
volume: k.volume
},
create: {
stockCode: stockCode,
period: period,
date: new Date(k.date),
open: k.open,
high: k.high,
low: k.low,
close: k.close,
volume: k.volume
}
})
)
);
console.log(`Synced ${klines.length} klines for ${stockCode}`);
} catch (error) {
console.error(`Sync kline failed for ${stockCode}:`, error);
}
}
}
四、WebSocket 实现
文件: src/websocket/stockSocket.ts
import { Server } from 'socket.io';
export class StockSocket {
private io: Server;
constructor(server: any) {
this.io = new Server(server, {
cors: {
origin: '*',
methods: ['GET', 'POST']
}
});
this.setupHandlers();
}
private setupHandlers() {
this.io.on('connection', (socket) => {
console.log('Client connected:', socket.id);
// 订阅股票
socket.on('subscribe', (channels: string[]) => {
channels.forEach(channel => {
socket.join(channel);
console.log(`Client ${socket.id} subscribed to ${channel}`);
});
});
// 取消订阅
socket.on('unsubscribe', (channels: string[]) => {
channels.forEach(channel => {
socket.leave(channel);
console.log(`Client ${socket.id} unsubscribed from ${channel}`);
});
});
socket.on('disconnect', () => {
console.log('Client disconnected:', socket.id);
});
});
}
// 推送股票行情
broadcastStockQuote(stockCode: string, data: any) {
this.io.to(`stock:${stockCode}`).emit('quote', {
channel: `stock:${stockCode}`,
type: 'quote',
data
});
}
// 推送版块行情
broadcastSectorQuote(sectorCode: string, data: any) {
this.io.to(`sector:${sectorCode}`).emit('quote', {
channel: `sector:${sectorCode}`,
type: 'quote',
data
});
}
}
五、定时任务
文件: src/jobs/syncMarketData.ts
import cron from 'node-cron';
import { DataSyncService } from '../services/dataSyncService';
const dataSyncService = new DataSyncService();
// 每3秒同步实时行情(交易时间)
cron.schedule('*/3 * * * * *', async () => {
const now = new Date();
const hour = now.getHours();
const minute = now.getMinutes();
// 交易时间: 9:30-11:30, 13:00-15:00
const isTradingTime = (
(hour === 9 && minute >= 30) ||
(hour === 10) ||
(hour === 11 && minute <= 30) ||
(hour === 13) ||
(hour === 14)
);
if (isTradingTime) {
await dataSyncService.syncRealTimeQuotes();
}
});
// 每小时同步K线数据
cron.schedule('0 * * * *', async () => {
const stocks = await prisma.stock.findMany();
for (const stock of stocks) {
await dataSyncService.syncKLineData(stock.code);
}
});
// 每日收盘后计算版块排名
cron.schedule('0 15 * * 1-5', async () => {
await sectorService.calculateAndUpdateRankings();
});
六、环境变量
文件: .env
# 服务器配置
PORT=3000
NODE_ENV=production
# 数据库配置
DATABASE_URL=mysql://user:password@localhost:3306/aguzhitou
# Redis配置
REDIS_URL=redis://localhost:6379
# JWT配置
JWT_SECRET=your-secret-key
JWT_EXPIRES_IN=7d
# AKShare配置
AKSHARE_URL=http://localhost:8000
# 日志配置
LOG_LEVEL=info
七、部署脚本
文件: docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "3000:3000"
environment:
- DATABASE_URL=mysql://root:rootpass@mysql:3306/aguzhitou
- REDIS_URL=redis://redis:6379
- JWT_SECRET=${JWT_SECRET}
depends_on:
- mysql
- redis
restart: always
mysql:
image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=rootpass
- MYSQL_DATABASE=aguzhitou
volumes:
- mysql_data:/var/lib/mysql
ports:
- "3306:3306"
restart: always
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
ports:
- "6379:6379"
restart: always
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- app
restart: always
volumes:
mysql_data:
redis_data:
八、测试
// 示例测试
describe('StockService', () => {
let stockService: StockService;
beforeEach(() => {
stockService = new StockService();
});
it('should calculate MA correctly', () => {
const klines = [
{ close: 10 }, { close: 11 }, { close: 12 },
{ close: 13 }, { close: 14 }, { close: 15 }
];
const result = stockService['calculateMA'](klines);
expect(result[5].ma5).toBe(13); // (11+12+13+14+15)/5 = 13
});
});
九、待实现清单
- 数据库表创建
- Prisma schema 定义
- API 路由实现
- WebSocket 服务
- 定时任务配置
- 单元测试
- Docker 部署
- CI/CD 配置
- 监控告警
- 日志收集