You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

23 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

A股智投分析平台 - 后端实现文档

一、技术选型

1.1 推荐方案: Node.js + Express

技术栈:
- Node.js 20.x LTS
- Express 4.x
- TypeScript 5.x
- Prisma ORM
- Socket.io (WebSocket)
- Redis (缓存)
- MySQL 8.0 (数据库)

1.2 备选方案: Python + FastAPI

技术栈:
- Python 3.11
- FastAPI
- SQLAlchemy
- WebSockets
- Celery (定时任务)
- Redis
- PostgreSQL

二、项目结构

backend/
├── src/
│   ├── config/              # 配置文件
│   │   ├── database.ts      # 数据库配置
│   │   ├── redis.ts         # Redis配置
│   │   └── constants.ts     # 常量定义
│   │
│   ├── controllers/         # 控制器层
│   │   ├── marketController.ts
│   │   ├── sectorController.ts
│   │   ├── stockController.ts
│   │   └── userController.ts
│   │
│   ├── services/            # 业务逻辑层
│   │   ├── marketService.ts
│   │   ├── sectorService.ts
│   │   ├── stockService.ts
│   │   ├── dataSyncService.ts
│   │   └── calculationService.ts
│   │
│   ├── models/              # 数据模型层
│   │   ├── Stock.ts
│   │   ├── Sector.ts
│   │   ├── KLine.ts
│   │   └── User.ts
│   │
│   ├── routes/              # 路由定义
│   │   ├── marketRoutes.ts
│   │   ├── sectorRoutes.ts
│   │   ├── stockRoutes.ts
│   │   └── userRoutes.ts
│   │
│   ├── middleware/          # 中间件
│   │   ├── auth.ts          # 认证中间件
│   │   ├── errorHandler.ts  # 错误处理
│   │   ├── rateLimiter.ts   # 限流
│   │   └── logger.ts        # 日志
│   │
│   ├── utils/               # 工具函数
│   │   ├── logger.ts
│   │   ├── validator.ts
│   │   ├── formatter.ts
│   │   └── maCalculator.ts  # 均线计算
│   │
│   ├── websocket/           # WebSocket服务
│   │   └── stockSocket.ts
│   │
│   ├── jobs/                # 定时任务
│   │   ├── syncMarketData.ts
│   │   ├── calculateMomentum.ts
│   │   └── updateRankings.ts
│   │
│   ├── types/               # 类型定义
│   │   └── index.ts
│   │
│   └── app.ts               # 应用入口
│
├── prisma/                  # Prisma ORM
│   └── schema.prisma
│
├── tests/                   # 测试文件
│   ├── unit/
│   └── integration/
│
├── scripts/                 # 脚本文件
│   └── init-db.ts
│
├── .env                     # 环境变量
├── .env.example             # 环境变量示例
├── Dockerfile
├── docker-compose.yml
├── package.json
├── tsconfig.json
└── README.md

三、核心服务实现

3.1 市场数据服务

文件: src/services/marketService.ts

import { PrismaClient } from '@prisma/client';
import Redis from 'ioredis';

const prisma = new PrismaClient();
const redis = new Redis(process.env.REDIS_URL);

export class MarketService {
    // 获取市场指数
    async getMarketIndices() {
        const cacheKey = 'market:indices';
        const cached = await redis.get(cacheKey);
        
        if (cached) {
            return JSON.parse(cached);
        }
        
        const indices = await prisma.marketIndex.findMany({
            orderBy: { sortOrder: 'asc' }
        });
        
        await redis.setex(cacheKey, 60, JSON.stringify(indices));
        return indices;
    }
    
    // 获取涨跌家数统计
    async getUpDownStats() {
        const cacheKey = 'market:updown:stats';
        const cached = await redis.get(cacheKey);
        
        if (cached) {
            return JSON.parse(cached);
        }
        
        const stats = await prisma.stockQuote.groupBy({
            by: ['changePercent'],
            _count: { code: true },
            where: {
                quoteTime: {
                    gte: new Date(Date.now() - 5 * 60 * 1000) // 5分钟内
                }
            }
        });
        
        const result = {
            up: stats.filter(s => s.changePercent > 0).reduce((a, b) => a + b._count.code, 0),
            down: stats.filter(s => s.changePercent < 0).reduce((a, b) => a + b._count.code, 0),
            flat: stats.filter(s => s.changePercent === 0).reduce((a, b) => a + b._count.code, 0)
        };
        
        await redis.setex(cacheKey, 60, JSON.stringify(result));
        return result;
    }
    
    // 获取涨跌幅分布
    async getPriceDistribution() {
        const cacheKey = 'market:price:distribution';
        const cached = await redis.get(cacheKey);
        
        if (cached) {
            return JSON.parse(cached);
        }
        
        const ranges = [
            { range: '<-7%', min: -100, max: -7 },
            { range: '-7~-5%', min: -7, max: -5 },
            { range: '-5~-3%', min: -5, max: -3 },
            { range: '-3~0%', min: -3, max: 0 },
            { range: '0~3%', min: 0, max: 3 },
            { range: '3~5%', min: 3, max: 5 },
            { range: '5~7%', min: 5, max: 7 },
            { range: '>7%', min: 7, max: 100 }
        ];
        
        const distribution = await Promise.all(
            ranges.map(async r => {
                const count = await prisma.stockQuote.count({
                    where: {
                        changePercent: {
                            gte: r.min,
                            lt: r.max
                        },
                        quoteTime: {
                            gte: new Date(Date.now() - 5 * 60 * 1000)
                        }
                    }
                });
                return { ...r, count };
            })
        );
        
        await redis.setex(cacheKey, 60, JSON.stringify(distribution));
        return distribution;
    }
}

3.2 版块数据服务

文件: src/services/sectorService.ts

export class SectorService {
    // 获取版块列表(带动量排名)
    async getSectorsWithMomentum() {
        const cacheKey = 'sectors:momentum';
        const cached = await redis.get(cacheKey);
        
        if (cached) {
            return JSON.parse(cached);
        }
        
        const sectors = await prisma.sector.findMany({
            include: {
                quotes: {
                    orderBy: { quoteTime: 'desc' },
                    take: 1
                }
            }
        });
        
        // 计算动量分数和排名
        const sectorsWithMomentum = sectors.map(sector => {
            const latestQuote = sector.quotes[0];
            const momentumScore = this.calculateMomentumScore(sector);
            
            return {
                ...sector,
                changePercent: latestQuote?.changePercent || 0,
                momentumScore,
                rank: 0 // 稍后计算
            };
        });
        
        // 按动量分数排序
        sectorsWithMomentum.sort((a, b) => b.momentumScore - a.momentumScore);
        
        // 分配排名
        sectorsWithMomentum.forEach((sector, index) => {
            sector.rank = index + 1;
        });
        
        await redis.setex(cacheKey, 60, JSON.stringify(sectorsWithMomentum));
        return sectorsWithMomentum;
    }
    
    // 计算动量分数
    private calculateMomentumScore(sector: any): number {
        // 基于涨跌幅、成交量、趋势等因素计算
        const latestQuote = sector.quotes[0];
        if (!latestQuote) return 50;
        
        let score = 50;
        
        // 涨跌幅贡献 (0-30分)
        score += Math.min(Math.max(latestQuote.changePercent * 3, -15), 15);
        
        // 成交量贡献 (0-20分)
        const volumeRatio = latestQuote.volume / (latestQuote.avgVolume || latestQuote.volume);
        score += Math.min((volumeRatio - 1) * 10, 20);
        
        return Math.min(Math.max(score, 0), 100);
    }
    
    // 获取版块历史排名
    async getSectorRankHistory(sectorCode: string, days: number = 30) {
        const cacheKey = `sector:${sectorCode}:rank:history:${days}`;
        const cached = await redis.get(cacheKey);
        
        if (cached) {
            return JSON.parse(cached);
        }
        
        const history = await prisma.sectorQuote.findMany({
            where: {
                sectorCode,
                quoteTime: {
                    gte: new Date(Date.now() - days * 24 * 60 * 60 * 1000)
                }
            },
            orderBy: { quoteTime: 'asc' },
            select: {
                quoteTime: true,
                rank: true,
                momentumScore: true
            }
        });
        
        const result = history.map(h => ({
            date: h.quoteTime.toISOString().split('T')[0],
            rank: h.rank,
            momentumScore: h.momentumScore
        }));
        
        await redis.setex(cacheKey, 300, JSON.stringify(result));
        return result;
    }
    
    // 获取版块内动量股票
    async getSectorMomentumStocks(sectorCode: string) {
        const stocks = await prisma.stock.findMany({
            where: { sectorCode },
            include: {
                quotes: {
                    orderBy: { quoteTime: 'desc' },
                    take: 1
                }
            }
        });
        
        return stocks
            .map(stock => ({
                ...stock,
                ...stock.quotes[0],
                momentumScore: this.calculateStockMomentum(stock)
            }))
            .sort((a, b) => b.momentumScore - a.momentumScore);
    }
    
    private calculateStockMomentum(stock: any): number {
        const quote = stock.quotes[0];
        if (!quote) return 50;
        
        let score = 50;
        score += Math.min(Math.max(quote.changePercent * 4, -20), 20);
        
        const volumeRatio = quote.volume / (quote.avgVolume || quote.volume);
        score += Math.min((volumeRatio - 1) * 15, 25);
        
        return Math.min(Math.max(score, 0), 100);
    }
}

3.3 股票数据服务

文件: src/services/stockService.ts

export class StockService {
    // 搜索股票
    async searchStocks(keyword: string) {
        const stocks = await prisma.stock.findMany({
            where: {
                OR: [
                    { name: { contains: keyword } },
                    { code: { contains: keyword } }
                ]
            },
            take: 10,
            include: {
                quotes: {
                    orderBy: { quoteTime: 'desc' },
                    take: 1
                }
            }
        });
        
        return stocks.map(s => ({
            ...s,
            ...s.quotes[0]
        }));
    }
    
    // 获取个股详情
    async getStockDetail(code: string) {
        const cacheKey = `stock:${code}:detail`;
        const cached = await redis.get(cacheKey);
        
        if (cached) {
            return JSON.parse(cached);
        }
        
        const stock = await prisma.stock.findUnique({
            where: { code },
            include: {
                quotes: {
                    orderBy: { quoteTime: 'desc' },
                    take: 1
                }
            }
        });
        
        if (!stock) return null;
        
        const klines = await prisma.kLineData.findMany({
            where: { stockCode: code, period: 'day' },
            orderBy: { date: 'desc' },
            take: 60
        });
        
        // 计算技术指标
        const indicators = this.calculateIndicators(klines);
        
        const result = {
            ...stock,
            ...stock.quotes[0],
            ...indicators
        };
        
        await redis.setex(cacheKey, 60, JSON.stringify(result));
        return result;
    }
    
    // 获取K线数据
    async getKLineData(code: string, period: string = 'day', days: number = 60) {
        const cacheKey = `stock:${code}:kline:${period}:${days}`;
        const cached = await redis.get(cacheKey);
        
        if (cached) {
            return JSON.parse(cached);
        }
        
        const klines = await prisma.kLineData.findMany({
            where: {
                stockCode: code,
                period
            },
            orderBy: { date: 'desc' },
            take: days
        });
        
        // 计算均线
        const klinesWithMA = this.calculateMA(klines.reverse());
        
        await redis.setex(cacheKey, 300, JSON.stringify(klinesWithMA));
        return klinesWithMA;
    }
    
    // 计算均线
    private calculateMA(klines: any[]) {
        const periods = [5, 10, 20, 30, 60];
        
        return klines.map((kline, index) => {
            const ma: Record<string, number> = {};
            
            for (const period of periods) {
                if (index >= period - 1) {
                    const sum = klines
                        .slice(index - period + 1, index + 1)
                        .reduce((acc, k) => acc + k.close, 0);
                    ma[`ma${period}`] = Number((sum / period).toFixed(2));
                }
            }
            
            return { ...kline, ...ma };
        });
    }
    
    // 计算技术指标
    private calculateIndicators(klines: any[]) {
        return {
            macd: this.calculateMACD(klines),
            kdj: this.calculateKDJ(klines),
            rsi: this.calculateRSI(klines)
        };
    }
    
    // MACD计算
    private calculateMACD(klines: any[]) {
        const closes = klines.map(k => k.close).reverse();
        const ema12 = this.EMA(closes, 12);
        const ema26 = this.EMA(closes, 26);
        const dif = ema12.map((v, i) => v - ema26[i]);
        const dea = this.EMA(dif, 9);
        const macd = dif.map((v, i) => (v - dea[i]) * 2);
        
        return {
            dif: Number(dif[dif.length - 1].toFixed(3)),
            dea: Number(dea[dea.length - 1].toFixed(3)),
            macd: Number(macd[macd.length - 1].toFixed(3))
        };
    }
    
    // KDJ计算
    private calculateKDJ(klines: any[], n: number = 9) {
        // KDJ计算逻辑
        // ...
        return { k: 75.2, d: 68.5, j: 88.6 };
    }
    
    // RSI计算
    private calculateRSI(klines: any[]) {
        // RSI计算逻辑
        // ...
        return { rsi6: 72.5, rsi12: 68.3, rsi24: 65.1 };
    }
    
    // EMA计算
    private EMA(data: number[], n: number): number[] {
        const k = 2 / (n + 1);
        const ema: number[] = [data[0]];
        
        for (let i = 1; i < data.length; i++) {
            ema.push(data[i] * k + ema[i - 1] * (1 - k));
        }
        
        return ema;
    }
}

3.4 数据同步服务

文件: src/services/dataSyncService.ts

import axios from 'axios';

export class DataSyncService {
    private akshareBaseUrl = 'http://localhost:8000'; // AKShare服务地址
    
    // 同步实时行情
    async syncRealTimeQuotes() {
        try {
            // 从AKShare获取实时行情
            const response = await axios.get(`${this.akshareBaseUrl}/stock_zh_a_spot`);
            const quotes = response.data;
            
            // 批量插入数据库
            await prisma.$transaction(
                quotes.map((quote: any) =>
                    prisma.stockQuote.create({
                        data: {
                            stockCode: quote.code,
                            price: quote.price,
                            open: quote.open,
                            high: quote.high,
                            low: quote.low,
                            preClose: quote.pre_close,
                            volume: quote.volume,
                            turnover: quote.turnover,
                            changePercent: quote.change_percent,
                            quoteTime: new Date()
                        }
                    })
                )
            );
            
            console.log(`Synced ${quotes.length} quotes`);
        } catch (error) {
            console.error('Sync quotes failed:', error);
        }
    }
    
    // 同步K线数据
    async syncKLineData(stockCode: string, period: string = 'day') {
        try {
            const response = await axios.get(
                `${this.akshareBaseUrl}/stock_zh_a_hist`,
                {
                    params: {
                        symbol: stockCode,
                        period: period === 'day' ? 'daily' : period,
                        start_date: '20230101',
                        end_date: new Date().toISOString().split('T')[0].replace(/-/g, '')
                    }
                }
            );
            
            const klines = response.data;
            
            await prisma.$transaction(
                klines.map((k: any) =>
                    prisma.kLineData.upsert({
                        where: {
                            stockCode_period_date: {
                                stockCode: stockCode,
                                period: period,
                                date: new Date(k.date)
                            }
                        },
                        update: {
                            open: k.open,
                            high: k.high,
                            low: k.low,
                            close: k.close,
                            volume: k.volume
                        },
                        create: {
                            stockCode: stockCode,
                            period: period,
                            date: new Date(k.date),
                            open: k.open,
                            high: k.high,
                            low: k.low,
                            close: k.close,
                            volume: k.volume
                        }
                    })
                )
            );
            
            console.log(`Synced ${klines.length} klines for ${stockCode}`);
        } catch (error) {
            console.error(`Sync kline failed for ${stockCode}:`, error);
        }
    }
}

四、WebSocket 实现

文件: src/websocket/stockSocket.ts

import { Server } from 'socket.io';

export class StockSocket {
    private io: Server;
    
    constructor(server: any) {
        this.io = new Server(server, {
            cors: {
                origin: '*',
                methods: ['GET', 'POST']
            }
        });
        
        this.setupHandlers();
    }
    
    private setupHandlers() {
        this.io.on('connection', (socket) => {
            console.log('Client connected:', socket.id);
            
            // 订阅股票
            socket.on('subscribe', (channels: string[]) => {
                channels.forEach(channel => {
                    socket.join(channel);
                    console.log(`Client ${socket.id} subscribed to ${channel}`);
                });
            });
            
            // 取消订阅
            socket.on('unsubscribe', (channels: string[]) => {
                channels.forEach(channel => {
                    socket.leave(channel);
                    console.log(`Client ${socket.id} unsubscribed from ${channel}`);
                });
            });
            
            socket.on('disconnect', () => {
                console.log('Client disconnected:', socket.id);
            });
        });
    }
    
    // 推送股票行情
    broadcastStockQuote(stockCode: string, data: any) {
        this.io.to(`stock:${stockCode}`).emit('quote', {
            channel: `stock:${stockCode}`,
            type: 'quote',
            data
        });
    }
    
    // 推送版块行情
    broadcastSectorQuote(sectorCode: string, data: any) {
        this.io.to(`sector:${sectorCode}`).emit('quote', {
            channel: `sector:${sectorCode}`,
            type: 'quote',
            data
        });
    }
}

五、定时任务

文件: src/jobs/syncMarketData.ts

import cron from 'node-cron';
import { DataSyncService } from '../services/dataSyncService';

const dataSyncService = new DataSyncService();

// 每3秒同步实时行情交易时间
cron.schedule('*/3 * * * * *', async () => {
    const now = new Date();
    const hour = now.getHours();
    const minute = now.getMinutes();
    
    // 交易时间: 9:30-11:30, 13:00-15:00
    const isTradingTime = (
        (hour === 9 && minute >= 30) ||
        (hour === 10) ||
        (hour === 11 && minute <= 30) ||
        (hour === 13) ||
        (hour === 14)
    );
    
    if (isTradingTime) {
        await dataSyncService.syncRealTimeQuotes();
    }
});

// 每小时同步K线数据
cron.schedule('0 * * * *', async () => {
    const stocks = await prisma.stock.findMany();
    for (const stock of stocks) {
        await dataSyncService.syncKLineData(stock.code);
    }
});

// 每日收盘后计算版块排名
cron.schedule('0 15 * * 1-5', async () => {
    await sectorService.calculateAndUpdateRankings();
});

六、环境变量

文件: .env

# 服务器配置
PORT=3000
NODE_ENV=production

# 数据库配置
DATABASE_URL=mysql://user:password@localhost:3306/aguzhitou

# Redis配置
REDIS_URL=redis://localhost:6379

# JWT配置
JWT_SECRET=your-secret-key
JWT_EXPIRES_IN=7d

# AKShare配置
AKSHARE_URL=http://localhost:8000

# 日志配置
LOG_LEVEL=info

七、部署脚本

文件: docker-compose.yml

version: '3.8'

services:
  app:
    build: .
    ports:
      - "3000:3000"
    environment:
      - DATABASE_URL=mysql://root:rootpass@mysql:3306/aguzhitou
      - REDIS_URL=redis://redis:6379
      - JWT_SECRET=${JWT_SECRET}
    depends_on:
      - mysql
      - redis
    restart: always

  mysql:
    image: mysql:8.0
    environment:
      - MYSQL_ROOT_PASSWORD=rootpass
      - MYSQL_DATABASE=aguzhitou
    volumes:
      - mysql_data:/var/lib/mysql
    ports:
      - "3306:3306"
    restart: always

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    ports:
      - "6379:6379"
    restart: always

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - app
    restart: always

volumes:
  mysql_data:
  redis_data:

八、测试

// 示例测试
describe('StockService', () => {
    let stockService: StockService;
    
    beforeEach(() => {
        stockService = new StockService();
    });
    
    it('should calculate MA correctly', () => {
        const klines = [
            { close: 10 }, { close: 11 }, { close: 12 },
            { close: 13 }, { close: 14 }, { close: 15 }
        ];
        
        const result = stockService['calculateMA'](klines);
        
        expect(result[5].ma5).toBe(13); // (11+12+13+14+15)/5 = 13
    });
});

九、待实现清单

  • 数据库表创建
  • Prisma schema 定义
  • API 路由实现
  • WebSocket 服务
  • 定时任务配置
  • 单元测试
  • Docker 部署
  • CI/CD 配置
  • 监控告警
  • 日志收集