You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

553 lines
18 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import prisma from '../config/database';
import { cache } from '../config/redis';
import logger from '../utils/logger';
import { externalDataSourceService } from './externalDataSourceService';
export class DataSyncService {
// 延迟辅助函数
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 同步实时行情(模拟数据)
async syncRealTimeQuotes(): Promise<void> {
try {
logger.info('Starting real-time quotes sync...');
const stocks = await prisma.stock.findMany();
const now = new Date();
for (const stock of stocks.slice(0, 100)) {
try {
const basePrice = 10 + Math.random() * 100;
const changePercent = (Math.random() * 10 - 5);
const price = basePrice * (1 + changePercent / 100);
// 使用 create因为每次时间都不一样
await prisma.stockQuote.create({
data: {
stockCode: stock.code,
price: price,
open: basePrice * (1 + (Math.random() * 4 - 2) / 100),
high: price * (1 + Math.random() * 0.02),
low: price * (1 - Math.random() * 0.02),
preClose: basePrice,
volume: BigInt(Math.floor(Math.random() * 10000000)),
turnover: BigInt(Math.floor(Math.random() * 100000000)),
changePercent: changePercent,
quoteTime: now,
},
});
} catch (error) {
logger.error(`Failed to sync quote for ${stock.code}:`, error);
}
}
logger.info('Real-time quotes sync completed');
} catch (error) {
logger.error('Failed to sync real-time quotes:', error);
throw error;
}
}
// 同步版块行情(模拟数据)
async syncSectorQuotes(): Promise<void> {
try {
logger.info('Starting sector quotes sync...');
const sectors = await prisma.sector.findMany();
const now = new Date();
for (const sector of sectors) {
try {
const changePercent = Math.random() * 6 - 3;
await prisma.sectorQuote.create({
data: {
sectorCode: sector.code,
current: 1000 + Math.random() * 500,
change: changePercent * 10,
changePercent: changePercent,
volume: BigInt(Math.floor(Math.random() * 1000000000)),
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
quoteTime: now,
},
});
} catch (error) {
logger.error(`Failed to sync sector quote for ${sector.code}:`, error);
}
}
logger.info('Sector quotes sync completed');
} catch (error) {
logger.error('Failed to sync sector quotes:', error);
throw error;
}
}
// 同步K线数据单只股票
async syncKLineData(stockCode: string, period: string = 'day'): Promise<void> {
try {
logger.info(`Starting K-line sync for ${stockCode} (${period})...`);
// 获取最新K线日期
const latestKLine = await prisma.stockKLine.findFirst({
where: { stockCode, period },
orderBy: { date: 'desc' },
});
const startDate = latestKLine
? new Date(latestKLine.date.getTime() + 24 * 60 * 60 * 1000)
: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000);
const endDate = new Date();
// 使用模拟数据
const days = Math.min(100, Math.ceil((endDate.getTime() - startDate.getTime()) / (24 * 60 * 60 * 1000)));
for (let i = days; i >= 0; i--) {
const date = new Date(endDate);
date.setDate(date.getDate() - i);
const basePrice = 10 + Math.random() * 100;
const change = (Math.random() * 0.1 - 0.05);
const close = basePrice * (1 + change);
const open = basePrice * (1 + (Math.random() * 0.04 - 0.02));
const high = Math.max(open, close) * (1 + Math.random() * 0.02);
const low = Math.min(open, close) * (1 - Math.random() * 0.02);
try {
await prisma.stockKLine.upsert({
where: {
stockCode_period_date: {
stockCode,
period,
date,
},
},
update: {
open,
high,
low,
close,
volume: BigInt(Math.floor(Math.random() * 10000000)),
},
create: {
stockCode,
period,
date,
open,
high,
low,
close,
volume: BigInt(Math.floor(Math.random() * 10000000)),
},
});
} catch (error) {
logger.error(`Failed to upsert K-line for ${stockCode} on ${date}:`, error);
}
}
logger.info(`K-line sync completed for ${stockCode}`);
} catch (error) {
logger.error(`Failed to sync K-line for ${stockCode}:`, error);
throw error;
}
}
// 批量同步所有股票K线数据
async syncAllStocksKLine(period: string = 'day'): Promise<void> {
try {
logger.info(`Starting batch K-line sync for period: ${period}...`);
const stocks = await prisma.stock.findMany({
select: { code: true },
});
let successCount = 0;
let failCount = 0;
for (const stock of stocks) {
try {
await this.syncKLineData(stock.code, period);
successCount++;
await this.delay(100);
} catch (error) {
logger.error(`Failed to sync K-line for ${stock.code}:`, error);
failCount++;
}
}
logger.info(`Batch K-line sync completed: ${successCount} success, ${failCount} failed`);
} catch (error) {
logger.error('Failed to batch sync K-line data:', error);
throw error;
}
}
// 同步所有股票日行情数据(从外部数据源)
async syncAllStocks(): Promise<void> {
try {
logger.info('[syncAllStocks] 开始同步所有股票日行情数据...');
// 检查外部数据源是否可用
if (!externalDataSourceService.isEnabled()) {
throw new Error('数据源未配置,无法同步股票数据');
}
// 1. 获取本地数据库最新日期
const latestQuote = await prisma.stockDailyQuote.findFirst({
orderBy: { tradeDay: 'desc' },
select: { tradeDay: true },
});
const startDate = latestQuote?.tradeDay
? new Date(latestQuote.tradeDay.getTime() + 24 * 60 * 60 * 1000)
: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 默认30天前
logger.info(`[syncAllStocks] 本地最新日期: ${latestQuote?.tradeDay?.toISOString().split('T')[0] || '无'}, 开始日期: ${startDate.toISOString().split('T')[0]}`);
// 2. 获取交易日历(返回交易日字符串数组 YYYYMMDD
const tradingDateList = await externalDataSourceService.getTradingDates(
startDate.toISOString().split('T')[0].replace(/-/g, ''),
new Date().toISOString().split('T')[0].replace(/-/g, '')
);
logger.info(`[syncAllStocks] 需要同步 ${tradingDateList.length} 个交易日: ${tradingDateList.join(', ')}`);
if (tradingDateList.length === 0) {
logger.info('[syncAllStocks] 没有需要同步的交易日');
return;
}
// 3. 获取所有股票代码
const symbols = await externalDataSourceService.getSymbols({
type: 'stock',
limit: 6000
});
// 保留完整的 symbol 信息,用于后续调用 API
const stockList = symbols
.filter(s => s.symbol_id && s.status === 'active') // 只保留活跃股票
.map(s => ({
code: s.symbol_id?.replace(/\.(SZ|SH|BJ)$/i, '') || '', // 纯代码,用于数据库
symbol_id: s.symbol_id || '', // 完整代码(带后缀),用于 API 调用
name: s.name || '',
exchange: s.exchange || '',
}))
.filter(s => s.code && s.symbol_id); // 过滤掉无效数据
logger.info(`[syncAllStocks] 获取到 ${stockList.length} 只活跃股票,样例: ${stockList.slice(0, 5).map(s => s.symbol_id).join(', ')}`);
// 4. 按日期遍历,批量获取并保存数据
let totalSyncedDays = 0;
let totalSyncedRecords = 0;
for (const tradeDate of tradingDateList) {
try {
logger.info(`[syncAllStocks] 同步 ${tradeDate} 的数据...`);
// 格式化日期 YYYY-MM-DD
const formattedDate = `${tradeDate.slice(0, 4)}-${tradeDate.slice(4, 6)}-${tradeDate.slice(6, 8)}`;
// 批量获取该日所有股票的行情数据
// 分批处理每批10只股票
const batchSize = 10;
let daySyncedCount = 0;
for (let i = 0; i < stockList.length; i += batchSize) {
const batch = stockList.slice(i, i + batchSize);
try {
// 调用外部数据源批量获取日行情
const quotes = await this.fetchDailyQuotes(batch, formattedDate);
// 批量保存到数据库
await this.saveDailyQuotes(quotes, formattedDate);
daySyncedCount += quotes.length;
totalSyncedRecords += quotes.length;
// 避免请求过快
await this.delay(200);
} catch (error) {
logger.error(`[syncAllStocks] 批量获取 ${formattedDate}${i/batchSize + 1} 批数据失败:`, error);
}
}
totalSyncedDays++;
logger.info(`[syncAllStocks] ${formattedDate} 同步完成: ${daySyncedCount} 条记录`);
} catch (error) {
logger.error(`[syncAllStocks] 同步 ${tradeDate} 失败:`, error);
}
}
logger.info(`[syncAllStocks] 同步完成: ${totalSyncedDays} 天, ${totalSyncedRecords} 条记录`);
} catch (error) {
logger.error('[syncAllStocks] 同步失败:', error);
throw new Error('数据源异常,股票同步失败: ' + (error as Error).message);
}
}
// 批量获取日行情数据(辅助方法)
private async fetchDailyQuotes(
stocks: { code: string; symbol_id: string; name: string; exchange: string }[],
tradeDate: string
): Promise<any[]> {
// 使用 K 线接口获取日线数据
const quotes: any[] = [];
for (const stock of stocks) {
try {
// 使用完整的 symbol_id带后缀如 000001.SZ调用 API
const klines = await externalDataSourceService.getKLines(stock.symbol_id, 'day', {
startDate: tradeDate.replace(/-/g, ''),
endDate: tradeDate.replace(/-/g, ''),
limit: 1,
});
if (klines && klines.length > 0) {
const k = klines[0];
// 使用纯代码(不带后缀)保存到数据库
quotes.push({
stockCode: stock.code,
stockName: stock.name,
open: k.open,
close: k.close,
high: k.high,
low: k.low,
volume: k.volume,
amount: k.amount || 0,
differrange: ((k.close - k.open) / k.open) * 100,
});
}
} catch (error) {
logger.warn(`[fetchDailyQuotes] 获取 ${stock.symbol_id} ${tradeDate} 数据失败`);
}
}
return quotes;
}
// 批量保存日行情数据(辅助方法)
private async saveDailyQuotes(quotes: any[], tradeDate: string): Promise<void> {
const date = new Date(tradeDate);
for (const quote of quotes) {
try {
await prisma.stockDailyQuote.upsert({
where: {
stockCode_tradeDay: {
stockCode: quote.stockCode,
tradeDay: date,
},
},
update: {
open: quote.open,
close: quote.close,
high: quote.high,
low: quote.low,
volume: BigInt(quote.volume || 0),
amount: BigInt(quote.amount || 0),
differrange: quote.differrange,
},
create: {
stockCode: quote.stockCode,
tradeDay: date,
open: quote.open,
close: quote.close,
high: quote.high,
low: quote.low,
volume: BigInt(quote.volume || 0),
amount: BigInt(quote.amount || 0),
differrange: quote.differrange,
},
});
} catch (error) {
logger.error(`[saveDailyQuotes] 保存 ${quote.stockCode} 失败:`, error);
}
}
}
// 同步版块数据
async syncSectors(): Promise<void> {
try {
logger.info('Starting sync sectors...');
const sectors = [
{ name: '半导体', code: '880491' },
{ name: '新能源', code: '880952' },
{ name: '医药生物', code: '880122' },
{ name: '白酒', code: '880952' },
{ name: '银行', code: '880471' },
{ name: '证券', code: '880472' },
{ name: '保险', code: '880473' },
{ name: '房地产', code: '880482' },
{ name: '汽车', code: '880391' },
{ name: '电子', code: '880494' },
];
for (const sector of sectors) {
try {
await prisma.sector.upsert({
where: { code: sector.code },
update: { name: sector.name },
create: sector,
});
} catch (error) {
logger.error(`Failed to upsert sector ${sector.code}:`, error);
}
}
logger.info(`Synced ${sectors.length} sectors`);
} catch (error) {
logger.error('Failed to sync sectors:', error);
throw error;
}
}
// 计算动量分数
async calculateMomentumScores(): Promise<void> {
try {
logger.info('Starting calculate momentum scores...');
const stocks = await prisma.stock.findMany({
include: {
klines: {
where: { period: 'day' },
orderBy: { date: 'desc' },
take: 20,
},
},
});
for (const stock of stocks) {
try {
if (stock.klines.length < 5) continue;
const prices = stock.klines.map(k => k.close).reverse();
const ma5 = prices.slice(-5).reduce((a, b) => a + b, 0) / 5;
const ma10 = prices.slice(-10).reduce((a, b) => a + b, 0) / 10;
const ma20 = prices.slice(-20).reduce((a, b) => a + b, 0) / 20;
const currentPrice = prices[prices.length - 1];
const priceChange = (currentPrice - prices[0]) / prices[0] * 100;
let score = 50;
if (currentPrice > ma5) score += 10;
if (currentPrice > ma10) score += 10;
if (currentPrice > ma20) score += 10;
if (priceChange > 5) score += 10;
if (priceChange > 10) score += 10;
const today = new Date();
today.setHours(0, 0, 0, 0);
await prisma.momentumStock.upsert({
where: {
stockCode_date: {
stockCode: stock.code,
date: today,
},
},
update: {
momentumScore: score,
},
create: {
stockCode: stock.code,
momentumScore: score,
volumeRatio: 1 + Math.random() * 2,
breakThrough: score > 70,
date: today,
},
});
} catch (error) {
logger.error(`Failed to calculate momentum for ${stock.code}:`, error);
}
}
logger.info('Momentum scores calculation completed');
} catch (error) {
logger.error('Failed to calculate momentum scores:', error);
throw error;
}
}
// 同步市场指数
async syncMarketIndices(): Promise<void> {
try {
logger.info('Starting sync market indices...');
const indices = [
{ name: '上证指数', code: '000001' },
{ name: '深证成指', code: '399001' },
{ name: '创业板指', code: '399006' },
];
for (const index of indices) {
try {
await prisma.marketIndex.upsert({
where: { code: index.code },
update: {
current: 3000 + Math.random() * 500,
change: Math.random() * 40 - 20,
changePercent: Math.random() * 2 - 1,
volume: BigInt(Math.floor(Math.random() * 1000000000)),
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
},
create: {
name: index.name,
code: index.code,
current: 3000 + Math.random() * 500,
change: Math.random() * 40 - 20,
changePercent: Math.random() * 2 - 1,
volume: BigInt(Math.floor(Math.random() * 1000000000)),
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
},
});
} catch (error) {
logger.error(`Failed to upsert market index ${index.code}:`, error);
}
}
logger.info('Market indices sync completed');
} catch (error) {
logger.error('Failed to sync market indices:', error);
throw error;
}
}
// 初始化基础数据
async initBaseData(): Promise<void> {
try {
logger.info('Initializing base data...');
// 检查外部数据源是否可用
if (externalDataSourceService.isEnabled()) {
try {
// 同步股票
await this.syncAllStocks();
} catch (error) {
logger.warn('Stock sync failed during init, will retry later:', error);
}
} else {
logger.warn('External data source not enabled, skipping stock sync during init');
}
// 同步版块
await this.syncSectors();
// 同步市场指数
await this.syncMarketIndices();
logger.info('Base data initialization completed');
} catch (error) {
logger.error('Failed to initialize base data:', error);
}
}
}
export const dataSyncService = new DataSyncService();