""" 数据同步服务 定时从 amazingData 同步数据到 TimescaleDB """ import logging from datetime import datetime, date, timedelta from typing import List, Optional import asyncio from sqlalchemy import text from sqlalchemy.orm import Session from app.db.init_db import TimescaleSessionLocal from app.services.amazing_data_service import amazing_data_service logger = logging.getLogger(__name__) class DataSyncService: """数据同步服务""" # 默认同步的品种列表 DEFAULT_SYMBOLS = [ "IF2406", # 沪深 300 股指期货 "IC2406", # 中证 500 股指期货 "IH2406", # 上证 50 股指期货 "IM2406", # 中证 1000 股指期货 ] # 默认同步的周期 DEFAULT_PERIODS = ["1m", "5m", "15m", "30m", "1h", "1d"] @staticmethod def sync_kline_data( symbol: str, period: str, start_date: Optional[str] = None, end_date: Optional[str] = None ) -> int: """ 同步 K 线数据到 TimescaleDB Args: symbol: 品种代码 period: 周期 start_date: 开始日期 (YYYY-MM-DD),默认昨天 end_date: 结束日期 (YYYY-MM-DD),默认今天 Returns: int: 同步的记录数 """ try: # 默认日期范围 if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") if not start_date: start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") logger.info(f"Syncing kline data: {symbol} {period} from {start_date} to {end_date}") # 从 amazingData 获取数据 kline_data = amazing_data_service.get_kline_data( symbol=symbol, period=period, start_date=start_date, end_date=end_date ) if not kline_data: logger.warning(f"No data to sync for {symbol} {period}") return 0 # 插入到 TimescaleDB count = 0 with TimescaleSessionLocal() as db: for record in kline_data: try: query = text(""" INSERT INTO kline_data (time, symbol, period, open, high, low, close, volume, amount, open_interest) VALUES (:time, :symbol, :period, :open, :high, :low, :close, :volume, :amount, :open_interest) ON CONFLICT (time, symbol, period) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, amount = EXCLUDED.amount, open_interest = EXCLUDED.open_interest """) # 处理时间字段 time_val = record.get('time') if isinstance(time_val, str): time_val = datetime.fromisoformat(time_val.replace('Z', '+00:00')) db.execute( query, { 'time': time_val, 'symbol': symbol, 'period': period, 'open': float(record.get('open', 0)), 'high': float(record.get('high', 0)), 'low': float(record.get('low', 0)), 'close': float(record.get('close', 0)), 'volume': float(record.get('volume', 0)), 'amount': float(record.get('amount', 0)), 'open_interest': float(record.get('open_interest', 0)) } ) count += 1 except Exception as e: logger.error(f"Error inserting record: {e}") continue db.commit() logger.info(f"Synced {count} records for {symbol} {period}") return count except Exception as e: logger.error(f"Error syncing data for {symbol} {period}: {e}") raise @staticmethod async def sync_all_symbols( symbols: Optional[List[str]] = None, periods: Optional[List[str]] = None ) -> dict: """ 异步同步所有品种数据 Args: symbols: 品种列表,默认使用 DEFAULT_SYMBOLS periods: 周期列表,默认使用 DEFAULT_PERIODS Returns: dict: 同步结果统计 """ symbols = symbols or DataSyncService.DEFAULT_SYMBOLS periods = periods or DataSyncService.DEFAULT_PERIODS results = {} total_count = 0 # 确保连接 if not amazing_data_service.ensure_connected(): logger.error("Failed to connect to amazingData") return {'error': 'Connection failed', 'total': 0} for symbol in symbols: for period in periods: try: count = DataSyncService.sync_kline_data(symbol, period) results[f"{symbol}_{period}"] = count total_count += count # 避免请求过快 await asyncio.sleep(0.5) except Exception as e: logger.error(f"Failed to sync {symbol} {period}: {e}") results[f"{symbol}_{period}"] = 0 return { 'success': True, 'total_records': total_count, 'details': results } @staticmethod def sync_realtime_quotes(symbols: List[str]) -> int: """ 同步实时行情到 Redis Args: symbols: 品种列表 Returns: int: 同步的记录数 """ try: quotes = amazing_data_service.get_realtime_quotes(symbols) if not quotes: return 0 # 这里可以写入 Redis 缓存 # 具体实现取决于 Redis 服务的设计 logger.info(f"Synced {len(quotes)} realtime quotes") return len(quotes) except Exception as e: logger.error(f"Error syncing realtime quotes: {e}") raise # 便捷函数 def sync_kline(symbol: str, period: str, start_date: str = None, end_date: str = None) -> int: """同步 K 线数据""" return DataSyncService.sync_kline_data(symbol, period, start_date, end_date) async def sync_all() -> dict: """同步所有数据""" return await DataSyncService.sync_all_symbols()