""" 定时数据同步任务 使用 APScheduler 定时同步数据 """ import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from app.services.data_sync_service import DataSyncService from app.services.amazing_data_service import amazing_data_service logger = logging.getLogger(__name__) # 全局调度器 scheduler = AsyncIOScheduler() async def sync_kline_task(): """定时同步 K 线数据任务""" try: logger.info("Starting scheduled kline data sync") # 确保连接 if not amazing_data_service.ensure_connected(): logger.error("Failed to connect to amazingData for scheduled sync") return # 同步默认品种和周期 result = await DataSyncService.sync_all_symbols() logger.info(f"Scheduled sync completed: {result}") except Exception as e: logger.error(f"Scheduled sync failed: {e}") async def sync_realtime_task(): """定时同步实时行情任务""" try: logger.info("Starting scheduled realtime quote sync") symbols = DataSyncService.DEFAULT_SYMBOLS count = DataSyncService.sync_realtime_quotes(symbols) logger.info(f"Synced {count} realtime quotes") except Exception as e: logger.error(f"Realtime sync failed: {e}") def start_scheduler(): """启动定时任务调度器""" try: # 连接 amazingData(长连接) amazing_data_service.connect() # K 线数据同步:每分钟执行一次 scheduler.add_job( sync_kline_task, trigger=CronTrigger(second=0), # 每分钟的第 0 秒 id='sync_kline', name='Sync Kline Data', replace_existing=True ) # 实时行情同步:每 5 秒执行一次 scheduler.add_job( sync_realtime_task, trigger='interval', seconds=5, id='sync_realtime', name='Sync Realtime Quotes', replace_existing=True ) scheduler.start() logger.info("Scheduler started successfully") except Exception as e: logger.error(f"Failed to start scheduler: {e}") raise def stop_scheduler(): """停止定时任务调度器""" try: scheduler.shutdown() logger.info("Scheduler stopped") # 断开 amazingData 连接 amazing_data_service.disconnect() except Exception as e: logger.error(f"Error stopping scheduler: {e}") def get_scheduler() -> AsyncIOScheduler: """获取调度器实例""" return scheduler