import prisma from '../config/database'; import { cache } from '../config/redis'; import logger from '../utils/logger'; import { externalDataSourceService } from './externalDataSourceService'; export class DataSyncService { // 延迟辅助函数 private delay(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } // 同步实时行情(模拟数据) async syncRealTimeQuotes(): Promise { try { logger.info('Starting real-time quotes sync...'); const stocks = await prisma.stock.findMany(); const now = new Date(); for (const stock of stocks.slice(0, 100)) { try { const basePrice = 10 + Math.random() * 100; const changePercent = (Math.random() * 10 - 5); const price = basePrice * (1 + changePercent / 100); // 使用 create,因为每次时间都不一样 await prisma.stockQuote.create({ data: { stockCode: stock.code, price: price, open: basePrice * (1 + (Math.random() * 4 - 2) / 100), high: price * (1 + Math.random() * 0.02), low: price * (1 - Math.random() * 0.02), preClose: basePrice, volume: BigInt(Math.floor(Math.random() * 10000000)), turnover: BigInt(Math.floor(Math.random() * 100000000)), changePercent: changePercent, quoteTime: now, }, }); } catch (error) { logger.error(`Failed to sync quote for ${stock.code}:`, error); } } logger.info('Real-time quotes sync completed'); } catch (error) { logger.error('Failed to sync real-time quotes:', error); throw error; } } // 同步版块行情(模拟数据) async syncSectorQuotes(): Promise { try { logger.info('Starting sector quotes sync...'); const sectors = await prisma.sector.findMany(); const now = new Date(); for (const sector of sectors) { try { const changePercent = Math.random() * 6 - 3; await prisma.sectorQuote.create({ data: { sectorCode: sector.code, current: 1000 + Math.random() * 500, change: changePercent * 10, changePercent: changePercent, volume: BigInt(Math.floor(Math.random() * 1000000000)), turnover: BigInt(Math.floor(Math.random() * 10000000000)), quoteTime: now, }, }); } catch (error) { logger.error(`Failed to sync sector quote for ${sector.code}:`, error); } } logger.info('Sector quotes sync completed'); } catch (error) { logger.error('Failed to sync sector quotes:', error); throw error; } } // 同步K线数据(单只股票) async syncKLineData(stockCode: string, period: string = 'day'): Promise { try { logger.info(`Starting K-line sync for ${stockCode} (${period})...`); // 获取最新K线日期 const latestKLine = await prisma.stockKLine.findFirst({ where: { stockCode, period }, orderBy: { date: 'desc' }, }); const startDate = latestKLine ? new Date(latestKLine.date.getTime() + 24 * 60 * 60 * 1000) : new Date(Date.now() - 365 * 24 * 60 * 60 * 1000); const endDate = new Date(); // 使用模拟数据 const days = Math.min(100, Math.ceil((endDate.getTime() - startDate.getTime()) / (24 * 60 * 60 * 1000))); for (let i = days; i >= 0; i--) { const date = new Date(endDate); date.setDate(date.getDate() - i); const basePrice = 10 + Math.random() * 100; const change = (Math.random() * 0.1 - 0.05); const close = basePrice * (1 + change); const open = basePrice * (1 + (Math.random() * 0.04 - 0.02)); const high = Math.max(open, close) * (1 + Math.random() * 0.02); const low = Math.min(open, close) * (1 - Math.random() * 0.02); try { await prisma.stockKLine.upsert({ where: { stockCode_period_date: { stockCode, period, date, }, }, update: { open, high, low, close, volume: BigInt(Math.floor(Math.random() * 10000000)), }, create: { stockCode, period, date, open, high, low, close, volume: BigInt(Math.floor(Math.random() * 10000000)), }, }); } catch (error) { logger.error(`Failed to upsert K-line for ${stockCode} on ${date}:`, error); } } logger.info(`K-line sync completed for ${stockCode}`); } catch (error) { logger.error(`Failed to sync K-line for ${stockCode}:`, error); throw error; } } // 批量同步所有股票K线数据 async syncAllStocksKLine(period: string = 'day'): Promise { try { logger.info(`Starting batch K-line sync for period: ${period}...`); const stocks = await prisma.stock.findMany({ select: { code: true }, }); let successCount = 0; let failCount = 0; for (const stock of stocks) { try { await this.syncKLineData(stock.code, period); successCount++; await this.delay(100); } catch (error) { logger.error(`Failed to sync K-line for ${stock.code}:`, error); failCount++; } } logger.info(`Batch K-line sync completed: ${successCount} success, ${failCount} failed`); } catch (error) { logger.error('Failed to batch sync K-line data:', error); throw error; } } // 同步所有股票日行情数据(从外部数据源) async syncAllStocks(): Promise { try { logger.info('[syncAllStocks] 开始同步所有股票日行情数据...'); // 检查外部数据源是否可用 if (!externalDataSourceService.isEnabled()) { throw new Error('数据源未配置,无法同步股票数据'); } // 1. 获取本地数据库最新日期 const latestQuote = await prisma.stockDailyQuote.findFirst({ orderBy: { tradeDay: 'desc' }, select: { tradeDay: true }, }); const startDate = latestQuote?.tradeDay ? new Date(latestQuote.tradeDay.getTime() + 24 * 60 * 60 * 1000) : new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 默认30天前 logger.info(`[syncAllStocks] 本地最新日期: ${latestQuote?.tradeDay?.toISOString().split('T')[0] || '无'}, 开始日期: ${startDate.toISOString().split('T')[0]}`); // 2. 获取交易日历(返回交易日字符串数组 YYYYMMDD) const tradingDateList = await externalDataSourceService.getTradingDates( startDate.toISOString().split('T')[0].replace(/-/g, ''), new Date().toISOString().split('T')[0].replace(/-/g, '') ); logger.info(`[syncAllStocks] 需要同步 ${tradingDateList.length} 个交易日: ${tradingDateList.join(', ')}`); if (tradingDateList.length === 0) { logger.info('[syncAllStocks] 没有需要同步的交易日'); return; } // 3. 获取所有股票代码 const symbols = await externalDataSourceService.getSymbols({ type: 'stock', limit: 6000 }); // 保留完整的 symbol 信息,用于后续调用 API const stockList = symbols .filter(s => s.symbol_id && s.status === 'active') // 只保留活跃股票 .map(s => ({ code: s.symbol_id?.replace(/\.(SZ|SH|BJ)$/i, '') || '', // 纯代码,用于数据库 symbol_id: s.symbol_id || '', // 完整代码(带后缀),用于 API 调用 name: s.name || '', exchange: s.exchange || '', })) .filter(s => s.code && s.symbol_id); // 过滤掉无效数据 logger.info(`[syncAllStocks] 获取到 ${stockList.length} 只活跃股票,样例: ${stockList.slice(0, 5).map(s => s.symbol_id).join(', ')}`); // 4. 按日期遍历,批量获取并保存数据 let totalSyncedDays = 0; let totalSyncedRecords = 0; for (const tradeDate of tradingDateList) { try { logger.info(`[syncAllStocks] 同步 ${tradeDate} 的数据...`); // 格式化日期 YYYY-MM-DD const formattedDate = `${tradeDate.slice(0, 4)}-${tradeDate.slice(4, 6)}-${tradeDate.slice(6, 8)}`; // 批量获取该日所有股票的行情数据 // 分批处理,每批10只股票 const batchSize = 10; let daySyncedCount = 0; for (let i = 0; i < stockList.length; i += batchSize) { const batch = stockList.slice(i, i + batchSize); try { // 调用外部数据源批量获取日行情 const quotes = await this.fetchDailyQuotes(batch, formattedDate); // 批量保存到数据库 await this.saveDailyQuotes(quotes, formattedDate); daySyncedCount += quotes.length; totalSyncedRecords += quotes.length; // 避免请求过快 await this.delay(200); } catch (error) { logger.error(`[syncAllStocks] 批量获取 ${formattedDate} 第 ${i/batchSize + 1} 批数据失败:`, error); } } totalSyncedDays++; logger.info(`[syncAllStocks] ${formattedDate} 同步完成: ${daySyncedCount} 条记录`); } catch (error) { logger.error(`[syncAllStocks] 同步 ${tradeDate} 失败:`, error); } } logger.info(`[syncAllStocks] 同步完成: ${totalSyncedDays} 天, ${totalSyncedRecords} 条记录`); } catch (error) { logger.error('[syncAllStocks] 同步失败:', error); throw new Error('数据源异常,股票同步失败: ' + (error as Error).message); } } // 批量获取日行情数据(辅助方法) private async fetchDailyQuotes( stocks: { code: string; symbol_id: string; name: string; exchange: string }[], tradeDate: string ): Promise { // 使用 K 线接口获取日线数据 const quotes: any[] = []; for (const stock of stocks) { try { // 使用完整的 symbol_id(带后缀,如 000001.SZ)调用 API const klines = await externalDataSourceService.getKLines(stock.symbol_id, 'day', { startDate: tradeDate.replace(/-/g, ''), endDate: tradeDate.replace(/-/g, ''), limit: 1, }); if (klines && klines.length > 0) { const k = klines[0]; // 使用纯代码(不带后缀)保存到数据库 quotes.push({ stockCode: stock.code, stockName: stock.name, open: k.open, close: k.close, high: k.high, low: k.low, volume: k.volume, amount: k.amount || 0, differrange: ((k.close - k.open) / k.open) * 100, }); } } catch (error) { logger.warn(`[fetchDailyQuotes] 获取 ${stock.symbol_id} ${tradeDate} 数据失败`); } } return quotes; } // 批量保存日行情数据(辅助方法) private async saveDailyQuotes(quotes: any[], tradeDate: string): Promise { const date = new Date(tradeDate); for (const quote of quotes) { try { await prisma.stockDailyQuote.upsert({ where: { stockCode_tradeDay: { stockCode: quote.stockCode, tradeDay: date, }, }, update: { open: quote.open, close: quote.close, high: quote.high, low: quote.low, volume: BigInt(quote.volume || 0), amount: BigInt(quote.amount || 0), differrange: quote.differrange, }, create: { stockCode: quote.stockCode, tradeDay: date, open: quote.open, close: quote.close, high: quote.high, low: quote.low, volume: BigInt(quote.volume || 0), amount: BigInt(quote.amount || 0), differrange: quote.differrange, }, }); } catch (error) { logger.error(`[saveDailyQuotes] 保存 ${quote.stockCode} 失败:`, error); } } } // 同步版块数据 async syncSectors(): Promise { try { logger.info('Starting sync sectors...'); const sectors = [ { name: '半导体', code: '880491' }, { name: '新能源', code: '880952' }, { name: '医药生物', code: '880122' }, { name: '白酒', code: '880952' }, { name: '银行', code: '880471' }, { name: '证券', code: '880472' }, { name: '保险', code: '880473' }, { name: '房地产', code: '880482' }, { name: '汽车', code: '880391' }, { name: '电子', code: '880494' }, ]; for (const sector of sectors) { try { await prisma.sector.upsert({ where: { code: sector.code }, update: { name: sector.name }, create: sector, }); } catch (error) { logger.error(`Failed to upsert sector ${sector.code}:`, error); } } logger.info(`Synced ${sectors.length} sectors`); } catch (error) { logger.error('Failed to sync sectors:', error); throw error; } } // 计算动量分数 async calculateMomentumScores(): Promise { try { logger.info('Starting calculate momentum scores...'); const stocks = await prisma.stock.findMany({ include: { klines: { where: { period: 'day' }, orderBy: { date: 'desc' }, take: 20, }, }, }); for (const stock of stocks) { try { if (stock.klines.length < 5) continue; const prices = stock.klines.map(k => k.close).reverse(); const ma5 = prices.slice(-5).reduce((a, b) => a + b, 0) / 5; const ma10 = prices.slice(-10).reduce((a, b) => a + b, 0) / 10; const ma20 = prices.slice(-20).reduce((a, b) => a + b, 0) / 20; const currentPrice = prices[prices.length - 1]; const priceChange = (currentPrice - prices[0]) / prices[0] * 100; let score = 50; if (currentPrice > ma5) score += 10; if (currentPrice > ma10) score += 10; if (currentPrice > ma20) score += 10; if (priceChange > 5) score += 10; if (priceChange > 10) score += 10; const today = new Date(); today.setHours(0, 0, 0, 0); await prisma.momentumStock.upsert({ where: { stockCode_date: { stockCode: stock.code, date: today, }, }, update: { momentumScore: score, }, create: { stockCode: stock.code, momentumScore: score, volumeRatio: 1 + Math.random() * 2, breakThrough: score > 70, date: today, }, }); } catch (error) { logger.error(`Failed to calculate momentum for ${stock.code}:`, error); } } logger.info('Momentum scores calculation completed'); } catch (error) { logger.error('Failed to calculate momentum scores:', error); throw error; } } // 同步市场指数 async syncMarketIndices(): Promise { try { logger.info('Starting sync market indices...'); const indices = [ { name: '上证指数', code: '000001' }, { name: '深证成指', code: '399001' }, { name: '创业板指', code: '399006' }, ]; for (const index of indices) { try { await prisma.marketIndex.upsert({ where: { code: index.code }, update: { current: 3000 + Math.random() * 500, change: Math.random() * 40 - 20, changePercent: Math.random() * 2 - 1, volume: BigInt(Math.floor(Math.random() * 1000000000)), turnover: BigInt(Math.floor(Math.random() * 10000000000)), }, create: { name: index.name, code: index.code, current: 3000 + Math.random() * 500, change: Math.random() * 40 - 20, changePercent: Math.random() * 2 - 1, volume: BigInt(Math.floor(Math.random() * 1000000000)), turnover: BigInt(Math.floor(Math.random() * 10000000000)), }, }); } catch (error) { logger.error(`Failed to upsert market index ${index.code}:`, error); } } logger.info('Market indices sync completed'); } catch (error) { logger.error('Failed to sync market indices:', error); throw error; } } // 初始化基础数据 async initBaseData(): Promise { try { logger.info('Initializing base data...'); // 检查外部数据源是否可用 if (externalDataSourceService.isEnabled()) { try { // 同步股票 await this.syncAllStocks(); } catch (error) { logger.warn('Stock sync failed during init, will retry later:', error); } } else { logger.warn('External data source not enabled, skipping stock sync during init'); } // 同步版块 await this.syncSectors(); // 同步市场指数 await this.syncMarketIndices(); logger.info('Base data initialization completed'); } catch (error) { logger.error('Failed to initialize base data:', error); } } } export const dataSyncService = new DataSyncService();