You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

553 lines
18 KiB

import prisma from '../config/database';
import { cache } from '../config/redis';
import logger from '../utils/logger';
import { externalDataSourceService } from './externalDataSourceService';
export class DataSyncService {
// 延迟辅助函数
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 同步实时行情(模拟数据)
async syncRealTimeQuotes(): Promise<void> {
try {
logger.info('Starting real-time quotes sync...');
const stocks = await prisma.stock.findMany();
const now = new Date();
for (const stock of stocks.slice(0, 100)) {
try {
const basePrice = 10 + Math.random() * 100;
const changePercent = (Math.random() * 10 - 5);
const price = basePrice * (1 + changePercent / 100);
// 使用 create因为每次时间都不一样
await prisma.stockQuote.create({
data: {
stockCode: stock.code,
price: price,
open: basePrice * (1 + (Math.random() * 4 - 2) / 100),
high: price * (1 + Math.random() * 0.02),
low: price * (1 - Math.random() * 0.02),
preClose: basePrice,
volume: BigInt(Math.floor(Math.random() * 10000000)),
turnover: BigInt(Math.floor(Math.random() * 100000000)),
changePercent: changePercent,
quoteTime: now,
},
});
} catch (error) {
logger.error(`Failed to sync quote for ${stock.code}:`, error);
}
}
logger.info('Real-time quotes sync completed');
} catch (error) {
logger.error('Failed to sync real-time quotes:', error);
throw error;
}
}
// 同步版块行情(模拟数据)
async syncSectorQuotes(): Promise<void> {
try {
logger.info('Starting sector quotes sync...');
const sectors = await prisma.sector.findMany();
const now = new Date();
for (const sector of sectors) {
try {
const changePercent = Math.random() * 6 - 3;
await prisma.sectorQuote.create({
data: {
sectorCode: sector.code,
current: 1000 + Math.random() * 500,
change: changePercent * 10,
changePercent: changePercent,
volume: BigInt(Math.floor(Math.random() * 1000000000)),
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
quoteTime: now,
},
});
} catch (error) {
logger.error(`Failed to sync sector quote for ${sector.code}:`, error);
}
}
logger.info('Sector quotes sync completed');
} catch (error) {
logger.error('Failed to sync sector quotes:', error);
throw error;
}
}
// 同步K线数据单只股票
async syncKLineData(stockCode: string, period: string = 'day'): Promise<void> {
try {
logger.info(`Starting K-line sync for ${stockCode} (${period})...`);
// 获取最新K线日期
const latestKLine = await prisma.stockKLine.findFirst({
where: { stockCode, period },
orderBy: { date: 'desc' },
});
const startDate = latestKLine
? new Date(latestKLine.date.getTime() + 24 * 60 * 60 * 1000)
: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000);
const endDate = new Date();
// 使用模拟数据
const days = Math.min(100, Math.ceil((endDate.getTime() - startDate.getTime()) / (24 * 60 * 60 * 1000)));
for (let i = days; i >= 0; i--) {
const date = new Date(endDate);
date.setDate(date.getDate() - i);
const basePrice = 10 + Math.random() * 100;
const change = (Math.random() * 0.1 - 0.05);
const close = basePrice * (1 + change);
const open = basePrice * (1 + (Math.random() * 0.04 - 0.02));
const high = Math.max(open, close) * (1 + Math.random() * 0.02);
const low = Math.min(open, close) * (1 - Math.random() * 0.02);
try {
await prisma.stockKLine.upsert({
where: {
stockCode_period_date: {
stockCode,
period,
date,
},
},
update: {
open,
high,
low,
close,
volume: BigInt(Math.floor(Math.random() * 10000000)),
},
create: {
stockCode,
period,
date,
open,
high,
low,
close,
volume: BigInt(Math.floor(Math.random() * 10000000)),
},
});
} catch (error) {
logger.error(`Failed to upsert K-line for ${stockCode} on ${date}:`, error);
}
}
logger.info(`K-line sync completed for ${stockCode}`);
} catch (error) {
logger.error(`Failed to sync K-line for ${stockCode}:`, error);
throw error;
}
}
// 批量同步所有股票K线数据
async syncAllStocksKLine(period: string = 'day'): Promise<void> {
try {
logger.info(`Starting batch K-line sync for period: ${period}...`);
const stocks = await prisma.stock.findMany({
select: { code: true },
});
let successCount = 0;
let failCount = 0;
for (const stock of stocks) {
try {
await this.syncKLineData(stock.code, period);
successCount++;
await this.delay(100);
} catch (error) {
logger.error(`Failed to sync K-line for ${stock.code}:`, error);
failCount++;
}
}
logger.info(`Batch K-line sync completed: ${successCount} success, ${failCount} failed`);
} catch (error) {
logger.error('Failed to batch sync K-line data:', error);
throw error;
}
}
// 同步所有股票日行情数据(从外部数据源)
async syncAllStocks(): Promise<void> {
try {
logger.info('[syncAllStocks] 开始同步所有股票日行情数据...');
// 检查外部数据源是否可用
if (!externalDataSourceService.isEnabled()) {
throw new Error('数据源未配置,无法同步股票数据');
}
// 1. 获取本地数据库最新日期
const latestQuote = await prisma.stockDailyQuote.findFirst({
orderBy: { tradeDay: 'desc' },
select: { tradeDay: true },
});
const startDate = latestQuote?.tradeDay
? new Date(latestQuote.tradeDay.getTime() + 24 * 60 * 60 * 1000)
: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 默认30天前
logger.info(`[syncAllStocks] 本地最新日期: ${latestQuote?.tradeDay?.toISOString().split('T')[0] || '无'}, 开始日期: ${startDate.toISOString().split('T')[0]}`);
// 2. 获取交易日历(返回交易日字符串数组 YYYYMMDD
const tradingDateList = await externalDataSourceService.getTradingDates(
startDate.toISOString().split('T')[0].replace(/-/g, ''),
new Date().toISOString().split('T')[0].replace(/-/g, '')
);
logger.info(`[syncAllStocks] 需要同步 ${tradingDateList.length} 个交易日: ${tradingDateList.join(', ')}`);
if (tradingDateList.length === 0) {
logger.info('[syncAllStocks] 没有需要同步的交易日');
return;
}
// 3. 获取所有股票代码
const symbols = await externalDataSourceService.getSymbols({
type: 'stock',
limit: 6000
});
// 保留完整的 symbol 信息,用于后续调用 API
const stockList = symbols
.filter(s => s.symbol_id && s.status === 'active') // 只保留活跃股票
.map(s => ({
code: s.symbol_id?.replace(/\.(SZ|SH|BJ)$/i, '') || '', // 纯代码,用于数据库
symbol_id: s.symbol_id || '', // 完整代码(带后缀),用于 API 调用
name: s.name || '',
exchange: s.exchange || '',
}))
.filter(s => s.code && s.symbol_id); // 过滤掉无效数据
logger.info(`[syncAllStocks] 获取到 ${stockList.length} 只活跃股票,样例: ${stockList.slice(0, 5).map(s => s.symbol_id).join(', ')}`);
// 4. 按日期遍历,批量获取并保存数据
let totalSyncedDays = 0;
let totalSyncedRecords = 0;
for (const tradeDate of tradingDateList) {
try {
logger.info(`[syncAllStocks] 同步 ${tradeDate} 的数据...`);
// 格式化日期 YYYY-MM-DD
const formattedDate = `${tradeDate.slice(0, 4)}-${tradeDate.slice(4, 6)}-${tradeDate.slice(6, 8)}`;
// 批量获取该日所有股票的行情数据
// 分批处理每批10只股票
const batchSize = 10;
let daySyncedCount = 0;
for (let i = 0; i < stockList.length; i += batchSize) {
const batch = stockList.slice(i, i + batchSize);
try {
// 调用外部数据源批量获取日行情
const quotes = await this.fetchDailyQuotes(batch, formattedDate);
// 批量保存到数据库
await this.saveDailyQuotes(quotes, formattedDate);
daySyncedCount += quotes.length;
totalSyncedRecords += quotes.length;
// 避免请求过快
await this.delay(200);
} catch (error) {
logger.error(`[syncAllStocks] 批量获取 ${formattedDate}${i/batchSize + 1} 批数据失败:`, error);
}
}
totalSyncedDays++;
logger.info(`[syncAllStocks] ${formattedDate} 同步完成: ${daySyncedCount} 条记录`);
} catch (error) {
logger.error(`[syncAllStocks] 同步 ${tradeDate} 失败:`, error);
}
}
logger.info(`[syncAllStocks] 同步完成: ${totalSyncedDays} 天, ${totalSyncedRecords} 条记录`);
} catch (error) {
logger.error('[syncAllStocks] 同步失败:', error);
throw new Error('数据源异常,股票同步失败: ' + (error as Error).message);
}
}
// 批量获取日行情数据(辅助方法)
private async fetchDailyQuotes(
stocks: { code: string; symbol_id: string; name: string; exchange: string }[],
tradeDate: string
): Promise<any[]> {
// 使用 K 线接口获取日线数据
const quotes: any[] = [];
for (const stock of stocks) {
try {
// 使用完整的 symbol_id带后缀如 000001.SZ调用 API
const klines = await externalDataSourceService.getKLines(stock.symbol_id, 'day', {
startDate: tradeDate.replace(/-/g, ''),
endDate: tradeDate.replace(/-/g, ''),
limit: 1,
});
if (klines && klines.length > 0) {
const k = klines[0];
// 使用纯代码(不带后缀)保存到数据库
quotes.push({
stockCode: stock.code,
stockName: stock.name,
open: k.open,
close: k.close,
high: k.high,
low: k.low,
volume: k.volume,
amount: k.amount || 0,
differrange: ((k.close - k.open) / k.open) * 100,
});
}
} catch (error) {
logger.warn(`[fetchDailyQuotes] 获取 ${stock.symbol_id} ${tradeDate} 数据失败`);
}
}
return quotes;
}
// 批量保存日行情数据(辅助方法)
private async saveDailyQuotes(quotes: any[], tradeDate: string): Promise<void> {
const date = new Date(tradeDate);
for (const quote of quotes) {
try {
await prisma.stockDailyQuote.upsert({
where: {
stockCode_tradeDay: {
stockCode: quote.stockCode,
tradeDay: date,
},
},
update: {
open: quote.open,
close: quote.close,
high: quote.high,
low: quote.low,
volume: BigInt(quote.volume || 0),
amount: BigInt(quote.amount || 0),
differrange: quote.differrange,
},
create: {
stockCode: quote.stockCode,
tradeDay: date,
open: quote.open,
close: quote.close,
high: quote.high,
low: quote.low,
volume: BigInt(quote.volume || 0),
amount: BigInt(quote.amount || 0),
differrange: quote.differrange,
},
});
} catch (error) {
logger.error(`[saveDailyQuotes] 保存 ${quote.stockCode} 失败:`, error);
}
}
}
// 同步版块数据
async syncSectors(): Promise<void> {
try {
logger.info('Starting sync sectors...');
const sectors = [
{ name: '半导体', code: '880491' },
{ name: '新能源', code: '880952' },
{ name: '医药生物', code: '880122' },
{ name: '白酒', code: '880952' },
{ name: '银行', code: '880471' },
{ name: '证券', code: '880472' },
{ name: '保险', code: '880473' },
{ name: '房地产', code: '880482' },
{ name: '汽车', code: '880391' },
{ name: '电子', code: '880494' },
];
for (const sector of sectors) {
try {
await prisma.sector.upsert({
where: { code: sector.code },
update: { name: sector.name },
create: sector,
});
} catch (error) {
logger.error(`Failed to upsert sector ${sector.code}:`, error);
}
}
logger.info(`Synced ${sectors.length} sectors`);
} catch (error) {
logger.error('Failed to sync sectors:', error);
throw error;
}
}
// 计算动量分数
async calculateMomentumScores(): Promise<void> {
try {
logger.info('Starting calculate momentum scores...');
const stocks = await prisma.stock.findMany({
include: {
klines: {
where: { period: 'day' },
orderBy: { date: 'desc' },
take: 20,
},
},
});
for (const stock of stocks) {
try {
if (stock.klines.length < 5) continue;
const prices = stock.klines.map(k => k.close).reverse();
const ma5 = prices.slice(-5).reduce((a, b) => a + b, 0) / 5;
const ma10 = prices.slice(-10).reduce((a, b) => a + b, 0) / 10;
const ma20 = prices.slice(-20).reduce((a, b) => a + b, 0) / 20;
const currentPrice = prices[prices.length - 1];
const priceChange = (currentPrice - prices[0]) / prices[0] * 100;
let score = 50;
if (currentPrice > ma5) score += 10;
if (currentPrice > ma10) score += 10;
if (currentPrice > ma20) score += 10;
if (priceChange > 5) score += 10;
if (priceChange > 10) score += 10;
const today = new Date();
today.setHours(0, 0, 0, 0);
await prisma.momentumStock.upsert({
where: {
stockCode_date: {
stockCode: stock.code,
date: today,
},
},
update: {
momentumScore: score,
},
create: {
stockCode: stock.code,
momentumScore: score,
volumeRatio: 1 + Math.random() * 2,
breakThrough: score > 70,
date: today,
},
});
} catch (error) {
logger.error(`Failed to calculate momentum for ${stock.code}:`, error);
}
}
logger.info('Momentum scores calculation completed');
} catch (error) {
logger.error('Failed to calculate momentum scores:', error);
throw error;
}
}
// 同步市场指数
async syncMarketIndices(): Promise<void> {
try {
logger.info('Starting sync market indices...');
const indices = [
{ name: '上证指数', code: '000001' },
{ name: '深证成指', code: '399001' },
{ name: '创业板指', code: '399006' },
];
for (const index of indices) {
try {
await prisma.marketIndex.upsert({
where: { code: index.code },
update: {
current: 3000 + Math.random() * 500,
change: Math.random() * 40 - 20,
changePercent: Math.random() * 2 - 1,
volume: BigInt(Math.floor(Math.random() * 1000000000)),
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
},
create: {
name: index.name,
code: index.code,
current: 3000 + Math.random() * 500,
change: Math.random() * 40 - 20,
changePercent: Math.random() * 2 - 1,
volume: BigInt(Math.floor(Math.random() * 1000000000)),
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
},
});
} catch (error) {
logger.error(`Failed to upsert market index ${index.code}:`, error);
}
}
logger.info('Market indices sync completed');
} catch (error) {
logger.error('Failed to sync market indices:', error);
throw error;
}
}
// 初始化基础数据
async initBaseData(): Promise<void> {
try {
logger.info('Initializing base data...');
// 检查外部数据源是否可用
if (externalDataSourceService.isEnabled()) {
try {
// 同步股票
await this.syncAllStocks();
} catch (error) {
logger.warn('Stock sync failed during init, will retry later:', error);
}
} else {
logger.warn('External data source not enabled, skipping stock sync during init');
}
// 同步版块
await this.syncSectors();
// 同步市场指数
await this.syncMarketIndices();
logger.info('Base data initialization completed');
} catch (error) {
logger.error('Failed to initialize base data:', error);
}
}
}
export const dataSyncService = new DataSyncService();