|
|
import prisma from '../config/database';
|
|
|
import { cache } from '../config/redis';
|
|
|
import logger from '../utils/logger';
|
|
|
import { externalDataSourceService } from './externalDataSourceService';
|
|
|
|
|
|
export class DataSyncService {
|
|
|
// 延迟辅助函数
|
|
|
private delay(ms: number): Promise<void> {
|
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
|
}
|
|
|
|
|
|
// 同步实时行情(模拟数据)
|
|
|
async syncRealTimeQuotes(): Promise<void> {
|
|
|
try {
|
|
|
logger.info('Starting real-time quotes sync...');
|
|
|
|
|
|
const stocks = await prisma.stock.findMany();
|
|
|
const now = new Date();
|
|
|
|
|
|
for (const stock of stocks.slice(0, 100)) {
|
|
|
try {
|
|
|
const basePrice = 10 + Math.random() * 100;
|
|
|
const changePercent = (Math.random() * 10 - 5);
|
|
|
const price = basePrice * (1 + changePercent / 100);
|
|
|
|
|
|
// 使用 create,因为每次时间都不一样
|
|
|
await prisma.stockQuote.create({
|
|
|
data: {
|
|
|
stockCode: stock.code,
|
|
|
price: price,
|
|
|
open: basePrice * (1 + (Math.random() * 4 - 2) / 100),
|
|
|
high: price * (1 + Math.random() * 0.02),
|
|
|
low: price * (1 - Math.random() * 0.02),
|
|
|
preClose: basePrice,
|
|
|
volume: BigInt(Math.floor(Math.random() * 10000000)),
|
|
|
turnover: BigInt(Math.floor(Math.random() * 100000000)),
|
|
|
changePercent: changePercent,
|
|
|
quoteTime: now,
|
|
|
},
|
|
|
});
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to sync quote for ${stock.code}:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info('Real-time quotes sync completed');
|
|
|
} catch (error) {
|
|
|
logger.error('Failed to sync real-time quotes:', error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 同步版块行情(模拟数据)
|
|
|
async syncSectorQuotes(): Promise<void> {
|
|
|
try {
|
|
|
logger.info('Starting sector quotes sync...');
|
|
|
|
|
|
const sectors = await prisma.sector.findMany();
|
|
|
const now = new Date();
|
|
|
|
|
|
for (const sector of sectors) {
|
|
|
try {
|
|
|
const changePercent = Math.random() * 6 - 3;
|
|
|
|
|
|
await prisma.sectorQuote.create({
|
|
|
data: {
|
|
|
sectorCode: sector.code,
|
|
|
current: 1000 + Math.random() * 500,
|
|
|
change: changePercent * 10,
|
|
|
changePercent: changePercent,
|
|
|
volume: BigInt(Math.floor(Math.random() * 1000000000)),
|
|
|
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
|
|
|
quoteTime: now,
|
|
|
},
|
|
|
});
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to sync sector quote for ${sector.code}:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info('Sector quotes sync completed');
|
|
|
} catch (error) {
|
|
|
logger.error('Failed to sync sector quotes:', error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 同步K线数据(单只股票)
|
|
|
async syncKLineData(stockCode: string, period: string = 'day'): Promise<void> {
|
|
|
try {
|
|
|
logger.info(`Starting K-line sync for ${stockCode} (${period})...`);
|
|
|
|
|
|
// 获取最新K线日期
|
|
|
const latestKLine = await prisma.stockKLine.findFirst({
|
|
|
where: { stockCode, period },
|
|
|
orderBy: { date: 'desc' },
|
|
|
});
|
|
|
|
|
|
const startDate = latestKLine
|
|
|
? new Date(latestKLine.date.getTime() + 24 * 60 * 60 * 1000)
|
|
|
: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000);
|
|
|
|
|
|
const endDate = new Date();
|
|
|
|
|
|
// 使用模拟数据
|
|
|
const days = Math.min(100, Math.ceil((endDate.getTime() - startDate.getTime()) / (24 * 60 * 60 * 1000)));
|
|
|
|
|
|
for (let i = days; i >= 0; i--) {
|
|
|
const date = new Date(endDate);
|
|
|
date.setDate(date.getDate() - i);
|
|
|
|
|
|
const basePrice = 10 + Math.random() * 100;
|
|
|
const change = (Math.random() * 0.1 - 0.05);
|
|
|
const close = basePrice * (1 + change);
|
|
|
const open = basePrice * (1 + (Math.random() * 0.04 - 0.02));
|
|
|
const high = Math.max(open, close) * (1 + Math.random() * 0.02);
|
|
|
const low = Math.min(open, close) * (1 - Math.random() * 0.02);
|
|
|
|
|
|
try {
|
|
|
await prisma.stockKLine.upsert({
|
|
|
where: {
|
|
|
stockCode_period_date: {
|
|
|
stockCode,
|
|
|
period,
|
|
|
date,
|
|
|
},
|
|
|
},
|
|
|
update: {
|
|
|
open,
|
|
|
high,
|
|
|
low,
|
|
|
close,
|
|
|
volume: BigInt(Math.floor(Math.random() * 10000000)),
|
|
|
},
|
|
|
create: {
|
|
|
stockCode,
|
|
|
period,
|
|
|
date,
|
|
|
open,
|
|
|
high,
|
|
|
low,
|
|
|
close,
|
|
|
volume: BigInt(Math.floor(Math.random() * 10000000)),
|
|
|
},
|
|
|
});
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to upsert K-line for ${stockCode} on ${date}:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info(`K-line sync completed for ${stockCode}`);
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to sync K-line for ${stockCode}:`, error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 批量同步所有股票K线数据
|
|
|
async syncAllStocksKLine(period: string = 'day'): Promise<void> {
|
|
|
try {
|
|
|
logger.info(`Starting batch K-line sync for period: ${period}...`);
|
|
|
|
|
|
const stocks = await prisma.stock.findMany({
|
|
|
select: { code: true },
|
|
|
});
|
|
|
|
|
|
let successCount = 0;
|
|
|
let failCount = 0;
|
|
|
|
|
|
for (const stock of stocks) {
|
|
|
try {
|
|
|
await this.syncKLineData(stock.code, period);
|
|
|
successCount++;
|
|
|
|
|
|
await this.delay(100);
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to sync K-line for ${stock.code}:`, error);
|
|
|
failCount++;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info(`Batch K-line sync completed: ${successCount} success, ${failCount} failed`);
|
|
|
} catch (error) {
|
|
|
logger.error('Failed to batch sync K-line data:', error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 同步所有股票日行情数据(从外部数据源)
|
|
|
async syncAllStocks(): Promise<void> {
|
|
|
try {
|
|
|
logger.info('[syncAllStocks] 开始同步所有股票日行情数据...');
|
|
|
|
|
|
// 检查外部数据源是否可用
|
|
|
if (!externalDataSourceService.isEnabled()) {
|
|
|
throw new Error('数据源未配置,无法同步股票数据');
|
|
|
}
|
|
|
|
|
|
// 1. 获取本地数据库最新日期
|
|
|
const latestQuote = await prisma.stockDailyQuote.findFirst({
|
|
|
orderBy: { tradeDay: 'desc' },
|
|
|
select: { tradeDay: true },
|
|
|
});
|
|
|
|
|
|
const startDate = latestQuote?.tradeDay
|
|
|
? new Date(latestQuote.tradeDay.getTime() + 24 * 60 * 60 * 1000)
|
|
|
: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 默认30天前
|
|
|
|
|
|
logger.info(`[syncAllStocks] 本地最新日期: ${latestQuote?.tradeDay?.toISOString().split('T')[0] || '无'}, 开始日期: ${startDate.toISOString().split('T')[0]}`);
|
|
|
|
|
|
// 2. 获取交易日历(返回交易日字符串数组 YYYYMMDD)
|
|
|
const tradingDateList = await externalDataSourceService.getTradingDates(
|
|
|
startDate.toISOString().split('T')[0].replace(/-/g, ''),
|
|
|
new Date().toISOString().split('T')[0].replace(/-/g, '')
|
|
|
);
|
|
|
|
|
|
logger.info(`[syncAllStocks] 需要同步 ${tradingDateList.length} 个交易日: ${tradingDateList.join(', ')}`);
|
|
|
|
|
|
if (tradingDateList.length === 0) {
|
|
|
logger.info('[syncAllStocks] 没有需要同步的交易日');
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// 3. 获取所有股票代码
|
|
|
const symbols = await externalDataSourceService.getSymbols({
|
|
|
type: 'stock',
|
|
|
limit: 6000
|
|
|
});
|
|
|
|
|
|
// 保留完整的 symbol 信息,用于后续调用 API
|
|
|
const stockList = symbols
|
|
|
.filter(s => s.symbol_id && s.status === 'active') // 只保留活跃股票
|
|
|
.map(s => ({
|
|
|
code: s.symbol_id?.replace(/\.(SZ|SH|BJ)$/i, '') || '', // 纯代码,用于数据库
|
|
|
symbol_id: s.symbol_id || '', // 完整代码(带后缀),用于 API 调用
|
|
|
name: s.name || '',
|
|
|
exchange: s.exchange || '',
|
|
|
}))
|
|
|
.filter(s => s.code && s.symbol_id); // 过滤掉无效数据
|
|
|
|
|
|
logger.info(`[syncAllStocks] 获取到 ${stockList.length} 只活跃股票,样例: ${stockList.slice(0, 5).map(s => s.symbol_id).join(', ')}`);
|
|
|
|
|
|
// 4. 按日期遍历,批量获取并保存数据
|
|
|
let totalSyncedDays = 0;
|
|
|
let totalSyncedRecords = 0;
|
|
|
|
|
|
for (const tradeDate of tradingDateList) {
|
|
|
try {
|
|
|
logger.info(`[syncAllStocks] 同步 ${tradeDate} 的数据...`);
|
|
|
|
|
|
// 格式化日期 YYYY-MM-DD
|
|
|
const formattedDate = `${tradeDate.slice(0, 4)}-${tradeDate.slice(4, 6)}-${tradeDate.slice(6, 8)}`;
|
|
|
|
|
|
// 批量获取该日所有股票的行情数据
|
|
|
// 分批处理,每批10只股票
|
|
|
const batchSize = 10;
|
|
|
let daySyncedCount = 0;
|
|
|
|
|
|
for (let i = 0; i < stockList.length; i += batchSize) {
|
|
|
const batch = stockList.slice(i, i + batchSize);
|
|
|
|
|
|
try {
|
|
|
// 调用外部数据源批量获取日行情
|
|
|
const quotes = await this.fetchDailyQuotes(batch, formattedDate);
|
|
|
|
|
|
// 批量保存到数据库
|
|
|
await this.saveDailyQuotes(quotes, formattedDate);
|
|
|
|
|
|
daySyncedCount += quotes.length;
|
|
|
totalSyncedRecords += quotes.length;
|
|
|
|
|
|
// 避免请求过快
|
|
|
await this.delay(200);
|
|
|
} catch (error) {
|
|
|
logger.error(`[syncAllStocks] 批量获取 ${formattedDate} 第 ${i/batchSize + 1} 批数据失败:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
totalSyncedDays++;
|
|
|
logger.info(`[syncAllStocks] ${formattedDate} 同步完成: ${daySyncedCount} 条记录`);
|
|
|
|
|
|
} catch (error) {
|
|
|
logger.error(`[syncAllStocks] 同步 ${tradeDate} 失败:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info(`[syncAllStocks] 同步完成: ${totalSyncedDays} 天, ${totalSyncedRecords} 条记录`);
|
|
|
} catch (error) {
|
|
|
logger.error('[syncAllStocks] 同步失败:', error);
|
|
|
throw new Error('数据源异常,股票同步失败: ' + (error as Error).message);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 批量获取日行情数据(辅助方法)
|
|
|
private async fetchDailyQuotes(
|
|
|
stocks: { code: string; symbol_id: string; name: string; exchange: string }[],
|
|
|
tradeDate: string
|
|
|
): Promise<any[]> {
|
|
|
// 使用 K 线接口获取日线数据
|
|
|
const quotes: any[] = [];
|
|
|
|
|
|
for (const stock of stocks) {
|
|
|
try {
|
|
|
// 使用完整的 symbol_id(带后缀,如 000001.SZ)调用 API
|
|
|
const klines = await externalDataSourceService.getKLines(stock.symbol_id, 'day', {
|
|
|
startDate: tradeDate.replace(/-/g, ''),
|
|
|
endDate: tradeDate.replace(/-/g, ''),
|
|
|
limit: 1,
|
|
|
});
|
|
|
|
|
|
if (klines && klines.length > 0) {
|
|
|
const k = klines[0];
|
|
|
// 使用纯代码(不带后缀)保存到数据库
|
|
|
quotes.push({
|
|
|
stockCode: stock.code,
|
|
|
stockName: stock.name,
|
|
|
open: k.open,
|
|
|
close: k.close,
|
|
|
high: k.high,
|
|
|
low: k.low,
|
|
|
volume: k.volume,
|
|
|
amount: k.amount || 0,
|
|
|
differrange: ((k.close - k.open) / k.open) * 100,
|
|
|
});
|
|
|
}
|
|
|
} catch (error) {
|
|
|
logger.warn(`[fetchDailyQuotes] 获取 ${stock.symbol_id} ${tradeDate} 数据失败`);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return quotes;
|
|
|
}
|
|
|
|
|
|
// 批量保存日行情数据(辅助方法)
|
|
|
private async saveDailyQuotes(quotes: any[], tradeDate: string): Promise<void> {
|
|
|
const date = new Date(tradeDate);
|
|
|
|
|
|
for (const quote of quotes) {
|
|
|
try {
|
|
|
await prisma.stockDailyQuote.upsert({
|
|
|
where: {
|
|
|
stockCode_tradeDay: {
|
|
|
stockCode: quote.stockCode,
|
|
|
tradeDay: date,
|
|
|
},
|
|
|
},
|
|
|
update: {
|
|
|
open: quote.open,
|
|
|
close: quote.close,
|
|
|
high: quote.high,
|
|
|
low: quote.low,
|
|
|
volume: BigInt(quote.volume || 0),
|
|
|
amount: BigInt(quote.amount || 0),
|
|
|
differrange: quote.differrange,
|
|
|
},
|
|
|
create: {
|
|
|
stockCode: quote.stockCode,
|
|
|
tradeDay: date,
|
|
|
open: quote.open,
|
|
|
close: quote.close,
|
|
|
high: quote.high,
|
|
|
low: quote.low,
|
|
|
volume: BigInt(quote.volume || 0),
|
|
|
amount: BigInt(quote.amount || 0),
|
|
|
differrange: quote.differrange,
|
|
|
},
|
|
|
});
|
|
|
} catch (error) {
|
|
|
logger.error(`[saveDailyQuotes] 保存 ${quote.stockCode} 失败:`, error);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 同步版块数据
|
|
|
async syncSectors(): Promise<void> {
|
|
|
try {
|
|
|
logger.info('Starting sync sectors...');
|
|
|
|
|
|
const sectors = [
|
|
|
{ name: '半导体', code: '880491' },
|
|
|
{ name: '新能源', code: '880952' },
|
|
|
{ name: '医药生物', code: '880122' },
|
|
|
{ name: '白酒', code: '880952' },
|
|
|
{ name: '银行', code: '880471' },
|
|
|
{ name: '证券', code: '880472' },
|
|
|
{ name: '保险', code: '880473' },
|
|
|
{ name: '房地产', code: '880482' },
|
|
|
{ name: '汽车', code: '880391' },
|
|
|
{ name: '电子', code: '880494' },
|
|
|
];
|
|
|
|
|
|
for (const sector of sectors) {
|
|
|
try {
|
|
|
await prisma.sector.upsert({
|
|
|
where: { code: sector.code },
|
|
|
update: { name: sector.name },
|
|
|
create: sector,
|
|
|
});
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to upsert sector ${sector.code}:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info(`Synced ${sectors.length} sectors`);
|
|
|
} catch (error) {
|
|
|
logger.error('Failed to sync sectors:', error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 计算动量分数
|
|
|
async calculateMomentumScores(): Promise<void> {
|
|
|
try {
|
|
|
logger.info('Starting calculate momentum scores...');
|
|
|
|
|
|
const stocks = await prisma.stock.findMany({
|
|
|
include: {
|
|
|
klines: {
|
|
|
where: { period: 'day' },
|
|
|
orderBy: { date: 'desc' },
|
|
|
take: 20,
|
|
|
},
|
|
|
},
|
|
|
});
|
|
|
|
|
|
for (const stock of stocks) {
|
|
|
try {
|
|
|
if (stock.klines.length < 5) continue;
|
|
|
|
|
|
const prices = stock.klines.map(k => k.close).reverse();
|
|
|
const ma5 = prices.slice(-5).reduce((a, b) => a + b, 0) / 5;
|
|
|
const ma10 = prices.slice(-10).reduce((a, b) => a + b, 0) / 10;
|
|
|
const ma20 = prices.slice(-20).reduce((a, b) => a + b, 0) / 20;
|
|
|
|
|
|
const currentPrice = prices[prices.length - 1];
|
|
|
const priceChange = (currentPrice - prices[0]) / prices[0] * 100;
|
|
|
|
|
|
let score = 50;
|
|
|
if (currentPrice > ma5) score += 10;
|
|
|
if (currentPrice > ma10) score += 10;
|
|
|
if (currentPrice > ma20) score += 10;
|
|
|
if (priceChange > 5) score += 10;
|
|
|
if (priceChange > 10) score += 10;
|
|
|
|
|
|
const today = new Date();
|
|
|
today.setHours(0, 0, 0, 0);
|
|
|
|
|
|
await prisma.momentumStock.upsert({
|
|
|
where: {
|
|
|
stockCode_date: {
|
|
|
stockCode: stock.code,
|
|
|
date: today,
|
|
|
},
|
|
|
},
|
|
|
update: {
|
|
|
momentumScore: score,
|
|
|
},
|
|
|
create: {
|
|
|
stockCode: stock.code,
|
|
|
momentumScore: score,
|
|
|
volumeRatio: 1 + Math.random() * 2,
|
|
|
breakThrough: score > 70,
|
|
|
date: today,
|
|
|
},
|
|
|
});
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to calculate momentum for ${stock.code}:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info('Momentum scores calculation completed');
|
|
|
} catch (error) {
|
|
|
logger.error('Failed to calculate momentum scores:', error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 同步市场指数
|
|
|
async syncMarketIndices(): Promise<void> {
|
|
|
try {
|
|
|
logger.info('Starting sync market indices...');
|
|
|
|
|
|
const indices = [
|
|
|
{ name: '上证指数', code: '000001' },
|
|
|
{ name: '深证成指', code: '399001' },
|
|
|
{ name: '创业板指', code: '399006' },
|
|
|
];
|
|
|
|
|
|
for (const index of indices) {
|
|
|
try {
|
|
|
await prisma.marketIndex.upsert({
|
|
|
where: { code: index.code },
|
|
|
update: {
|
|
|
current: 3000 + Math.random() * 500,
|
|
|
change: Math.random() * 40 - 20,
|
|
|
changePercent: Math.random() * 2 - 1,
|
|
|
volume: BigInt(Math.floor(Math.random() * 1000000000)),
|
|
|
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
|
|
|
},
|
|
|
create: {
|
|
|
name: index.name,
|
|
|
code: index.code,
|
|
|
current: 3000 + Math.random() * 500,
|
|
|
change: Math.random() * 40 - 20,
|
|
|
changePercent: Math.random() * 2 - 1,
|
|
|
volume: BigInt(Math.floor(Math.random() * 1000000000)),
|
|
|
turnover: BigInt(Math.floor(Math.random() * 10000000000)),
|
|
|
},
|
|
|
});
|
|
|
} catch (error) {
|
|
|
logger.error(`Failed to upsert market index ${index.code}:`, error);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
logger.info('Market indices sync completed');
|
|
|
} catch (error) {
|
|
|
logger.error('Failed to sync market indices:', error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 初始化基础数据
|
|
|
async initBaseData(): Promise<void> {
|
|
|
try {
|
|
|
logger.info('Initializing base data...');
|
|
|
|
|
|
// 检查外部数据源是否可用
|
|
|
if (externalDataSourceService.isEnabled()) {
|
|
|
try {
|
|
|
// 同步股票
|
|
|
await this.syncAllStocks();
|
|
|
} catch (error) {
|
|
|
logger.warn('Stock sync failed during init, will retry later:', error);
|
|
|
}
|
|
|
} else {
|
|
|
logger.warn('External data source not enabled, skipping stock sync during init');
|
|
|
}
|
|
|
|
|
|
// 同步版块
|
|
|
await this.syncSectors();
|
|
|
|
|
|
// 同步市场指数
|
|
|
await this.syncMarketIndices();
|
|
|
|
|
|
logger.info('Base data initialization completed');
|
|
|
} catch (error) {
|
|
|
logger.error('Failed to initialize base data:', error);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
export const dataSyncService = new DataSyncService();
|