""" 调度服务 - APScheduler 管理定时采集任务 """ import logging import re from datetime import datetime, timedelta from typing import Dict, Optional from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.executors.pool import ThreadPoolExecutor from sqlalchemy.orm import Session from app.database import SessionLocal from app.services.collector import fetch_symbol_data from app.services.cache import save_market_data, update_task_status from app.config import SCHEDULER_MAX_INSTANCES, MAX_WORKERS logger = logging.getLogger(__name__) scheduler = BackgroundScheduler( executors={"default": ThreadPoolExecutor(max_workers=MAX_WORKERS)}, job_defaults={ "max_instances": SCHEDULER_MAX_INSTANCES, "misfire_grace_time": 60, }, ) def job_handler(task_id: int): """ 定时任务的执行函数。 每个任务独立创建 DB session,避免跨线程问题。 """ db: Session = SessionLocal() try: from app.services.cache import get_task task = get_task(db, task_id) if not task or not task.enabled: logger.warning(f"任务 {task_id} 不存在或已禁用,停止执行") return periods = task.periods.split(",") if task.periods else [] logger.info(f"[定时任务] 开始采集 {task.symbol} (periods={periods})") result = fetch_symbol_data( symbol=task.symbol, data_type=task.data_type, periods=periods, max_workers=MAX_WORKERS, ) if result.get("timeframes"): save_market_data(db, task.symbol, result) update_task_status(db, task_id, "success") logger.info(f"[定时任务] {task.symbol} 采集成功") # 如果是一次性任务,标记为已完成并从调度器移除 if task.task_type == "once": task.is_finished = True task.enabled = False db.commit() remove_job(task_id) logger.info(f"[定时任务] {task.symbol} 一次性任务已完成") else: update_task_status(db, task_id, "failed") logger.error(f"[定时任务] {task.symbol} 采集失败: {result.get('error')}") except Exception as e: logger.error(f"[定时任务] 执行异常 task_id={task_id}: {e}") try: update_task_status(db, task_id, "failed") except Exception: pass finally: db.close() def start_scheduler(): """启动调度器""" if not scheduler.running: scheduler.start() logger.info("调度器已启动") def stop_scheduler(): """停止调度器""" if scheduler.running: scheduler.shutdown(wait=False) logger.info("调度器已停止") def add_job(task_id: int, interval_seconds: int, task_type: str = "interval", run_time: Optional[str] = None) -> str: """ 添加定时任务到调度器。 Args: task_id: 任务ID interval_seconds: 间隔秒数 task_type: 任务类型 (interval, daily, once) run_time: 执行时间,格式 HH:MM (用于daily和once类型) Returns: job_id """ job_id = f"task_{task_id}" # 如果已存在,先移除 if scheduler.get_job(job_id): scheduler.remove_job(job_id) # 根据任务类型选择trigger try: if task_type == "daily" and run_time: # 每天定时执行 - 验证run_time格式 if not re.match(r'^\d{2}:\d{2}$', run_time): raise ValueError(f"run_time格式错误: {run_time}, 应为 HH:MM") hour, minute = map(int, run_time.split(":")) if not (0 <= hour <= 23 and 0 <= minute <= 59): raise ValueError(f"run_time值无效: {run_time}, 小时应为0-23, 分钟应为0-59") trigger = CronTrigger(hour=hour, minute=minute, timezone="Asia/Shanghai") elif task_type == "once" and run_time: # 仅执行一次 - 验证run_time格式 if not re.match(r'^\d{2}:\d{2}$', run_time): raise ValueError(f"run_time格式错误: {run_time}, 应为 HH:MM") hour, minute = map(int, run_time.split(":")) if not (0 <= hour <= 23 and 0 <= minute <= 59): raise ValueError(f"run_time值无效: {run_time}, 小时应为0-23, 分钟应为0-59") now = datetime.now() run_dt = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if run_dt <= now: # 如果时间已过,设置为明天(使用timedelta避免月末日期计算错误) run_dt = run_dt + timedelta(days=1) trigger = DateTrigger(run_date=run_dt, timezone="Asia/Shanghai") else: # 默认:间隔执行 trigger = IntervalTrigger(seconds=interval_seconds) except ValueError as e: logger.error(f"任务 {task_id} 时间配置错误: {e}, 使用默认间隔触发器") trigger = IntervalTrigger(seconds=interval_seconds) scheduler.add_job( func=job_handler, trigger=trigger, args=[task_id], id=job_id, name=f"auto_collect_{task_id}", replace_existing=True, ) logger.info(f"已添加定时任务: job_id={job_id}, type={task_type}, trigger={trigger}") return job_id def remove_job(task_id: int) -> bool: """移除定时任务""" job_id = f"task_{task_id}" job = scheduler.get_job(job_id) if job: scheduler.remove_job(job_id) logger.info(f"已移除定时任务: {job_id}") return True return False def is_job_running(task_id: int) -> bool: """检查任务是否正在调度器中运行""" job_id = f"task_{task_id}" return scheduler.get_job(job_id) is not None def get_all_jobs() -> Dict[str, dict]: """获取所有活跃任务信息""" jobs = scheduler.get_jobs() result = {} for job in jobs: nrt = getattr(job, 'next_run_time', None) result[job.id] = { "name": job.name, "next_run_time": nrt.isoformat() if nrt else None, "trigger": str(job.trigger), } return result