You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
6.4 KiB

"""
调度服务 - APScheduler 管理定时采集任务
"""
import logging
import re
from datetime import datetime, timedelta
from typing import Dict, Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.services.collector import fetch_symbol_data
from app.services.cache import save_market_data, update_task_status
from app.config import SCHEDULER_MAX_INSTANCES, MAX_WORKERS
logger = logging.getLogger(__name__)
scheduler = BackgroundScheduler(
executors={"default": ThreadPoolExecutor(max_workers=MAX_WORKERS)},
job_defaults={
"max_instances": SCHEDULER_MAX_INSTANCES,
"misfire_grace_time": 60,
},
)
def job_handler(task_id: int):
"""
定时任务的执行函数
每个任务独立创建 DB session避免跨线程问题
"""
db: Session = SessionLocal()
try:
from app.services.cache import get_task
task = get_task(db, task_id)
if not task or not task.enabled:
logger.warning(f"任务 {task_id} 不存在或已禁用,停止执行")
return
periods = task.periods.split(",") if task.periods else []
logger.info(f"[定时任务] 开始采集 {task.symbol} (periods={periods})")
result = fetch_symbol_data(
symbol=task.symbol,
data_type=task.data_type,
periods=periods,
max_workers=MAX_WORKERS,
)
if result.get("timeframes"):
save_market_data(db, task.symbol, result)
update_task_status(db, task_id, "success")
logger.info(f"[定时任务] {task.symbol} 采集成功")
# 如果是一次性任务,标记为已完成并从调度器移除
if task.task_type == "once":
task.is_finished = True
task.enabled = False
db.commit()
remove_job(task_id)
logger.info(f"[定时任务] {task.symbol} 一次性任务已完成")
else:
update_task_status(db, task_id, "failed")
logger.error(f"[定时任务] {task.symbol} 采集失败: {result.get('error')}")
except Exception as e:
logger.error(f"[定时任务] 执行异常 task_id={task_id}: {e}")
try:
update_task_status(db, task_id, "failed")
except Exception:
pass
finally:
db.close()
def start_scheduler():
"""启动调度器"""
if not scheduler.running:
scheduler.start()
logger.info("调度器已启动")
def stop_scheduler():
"""停止调度器"""
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("调度器已停止")
def add_job(task_id: int, interval_seconds: int, task_type: str = "interval", run_time: Optional[str] = None) -> str:
"""
添加定时任务到调度器
Args:
task_id: 任务ID
interval_seconds: 间隔秒数
task_type: 任务类型 (interval, daily, once)
run_time: 执行时间格式 HH:MM (用于daily和once类型)
Returns:
job_id
"""
job_id = f"task_{task_id}"
# 如果已存在,先移除
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
# 根据任务类型选择trigger
try:
if task_type == "daily" and run_time:
# 每天定时执行 - 验证run_time格式
if not re.match(r'^\d{2}:\d{2}$', run_time):
raise ValueError(f"run_time格式错误: {run_time}, 应为 HH:MM")
hour, minute = map(int, run_time.split(":"))
if not (0 <= hour <= 23 and 0 <= minute <= 59):
raise ValueError(f"run_time值无效: {run_time}, 小时应为0-23, 分钟应为0-59")
trigger = CronTrigger(hour=hour, minute=minute, timezone="Asia/Shanghai")
elif task_type == "once" and run_time:
# 仅执行一次 - 验证run_time格式
if not re.match(r'^\d{2}:\d{2}$', run_time):
raise ValueError(f"run_time格式错误: {run_time}, 应为 HH:MM")
hour, minute = map(int, run_time.split(":"))
if not (0 <= hour <= 23 and 0 <= minute <= 59):
raise ValueError(f"run_time值无效: {run_time}, 小时应为0-23, 分钟应为0-59")
now = datetime.now()
run_dt = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if run_dt <= now:
# 如果时间已过设置为明天使用timedelta避免月末日期计算错误
run_dt = run_dt + timedelta(days=1)
trigger = DateTrigger(run_date=run_dt, timezone="Asia/Shanghai")
else:
# 默认:间隔执行
trigger = IntervalTrigger(seconds=interval_seconds)
except ValueError as e:
logger.error(f"任务 {task_id} 时间配置错误: {e}, 使用默认间隔触发器")
trigger = IntervalTrigger(seconds=interval_seconds)
scheduler.add_job(
func=job_handler,
trigger=trigger,
args=[task_id],
id=job_id,
name=f"auto_collect_{task_id}",
replace_existing=True,
)
logger.info(f"已添加定时任务: job_id={job_id}, type={task_type}, trigger={trigger}")
return job_id
def remove_job(task_id: int) -> bool:
"""移除定时任务"""
job_id = f"task_{task_id}"
job = scheduler.get_job(job_id)
if job:
scheduler.remove_job(job_id)
logger.info(f"已移除定时任务: {job_id}")
return True
return False
def is_job_running(task_id: int) -> bool:
"""检查任务是否正在调度器中运行"""
job_id = f"task_{task_id}"
return scheduler.get_job(job_id) is not None
def get_all_jobs() -> Dict[str, dict]:
"""获取所有活跃任务信息"""
jobs = scheduler.get_jobs()
result = {}
for job in jobs:
nrt = getattr(job, 'next_run_time', None)
result[job.id] = {
"name": job.name,
"next_run_time": nrt.isoformat() if nrt else None,
"trigger": str(job.trigger),
}
return result