You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.9 KiB

"""
调度服务 - APScheduler 管理定时采集任务
"""
import logging
from datetime import datetime
from typing import Dict, Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.services.collector import fetch_symbol_data
from app.services.cache import save_market_data, update_task_status
from app.config import SCHEDULER_MAX_INSTANCES, MAX_WORKERS
logger = logging.getLogger(__name__)
scheduler = BackgroundScheduler(
executors={"default": ThreadPoolExecutor(max_workers=MAX_WORKERS)},
job_defaults={
"max_instances": SCHEDULER_MAX_INSTANCES,
"misfire_grace_time": 60,
},
)
def job_handler(task_id: int):
"""
定时任务的执行函数
每个任务独立创建 DB session避免跨线程问题
"""
db: Session = SessionLocal()
try:
from app.services.cache import get_task
task = get_task(db, task_id)
if not task or not task.enabled:
logger.warning(f"任务 {task_id} 不存在或已禁用,停止执行")
return
periods = task.periods.split(",") if task.periods else []
logger.info(f"[定时任务] 开始采集 {task.symbol} (periods={periods})")
result = fetch_symbol_data(
symbol=task.symbol,
data_type=task.data_type,
periods=periods,
max_workers=MAX_WORKERS,
)
if result.get("timeframes"):
save_market_data(db, task.symbol, result)
update_task_status(db, task_id, "success")
logger.info(f"[定时任务] {task.symbol} 采集成功")
else:
update_task_status(db, task_id, "failed")
logger.error(f"[定时任务] {task.symbol} 采集失败: {result.get('error')}")
except Exception as e:
logger.error(f"[定时任务] 执行异常 task_id={task_id}: {e}")
try:
update_task_status(db, task_id, "failed")
except Exception:
pass
finally:
db.close()
def start_scheduler():
"""启动调度器"""
if not scheduler.running:
scheduler.start()
logger.info("调度器已启动")
def stop_scheduler():
"""停止调度器"""
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("调度器已停止")
def add_job(task_id: int, interval_seconds: int) -> str:
"""
添加定时任务到调度器
Returns:
job_id
"""
job_id = f"task_{task_id}"
# 如果已存在,先移除
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
scheduler.add_job(
func=job_handler,
trigger=IntervalTrigger(seconds=interval_seconds),
args=[task_id],
id=job_id,
name=f"auto_collect_{task_id}",
replace_existing=True,
)
logger.info(f"已添加定时任务: job_id={job_id}, interval={interval_seconds}s")
return job_id
def remove_job(task_id: int) -> bool:
"""移除定时任务"""
job_id = f"task_{task_id}"
job = scheduler.get_job(job_id)
if job:
scheduler.remove_job(job_id)
logger.info(f"已移除定时任务: {job_id}")
return True
return False
def is_job_running(task_id: int) -> bool:
"""检查任务是否正在调度器中运行"""
job_id = f"task_{task_id}"
return scheduler.get_job(job_id) is not None
def get_all_jobs() -> Dict[str, dict]:
"""获取所有活跃任务信息"""
jobs = scheduler.get_jobs()
result = {}
for job in jobs:
nrt = getattr(job, 'next_run_time', None)
result[job.id] = {
"name": job.name,
"next_run_time": nrt.isoformat() if nrt else None,
"trigger": str(job.trigger),
}
return result