diff --git a/app/__pycache__/models.cpython-311.pyc b/app/__pycache__/models.cpython-311.pyc index 92ecb2a..fc97c34 100644 Binary files a/app/__pycache__/models.cpython-311.pyc and b/app/__pycache__/models.cpython-311.pyc differ diff --git a/app/__pycache__/schemas.cpython-311.pyc b/app/__pycache__/schemas.cpython-311.pyc index c8267b2..7cafdde 100644 Binary files a/app/__pycache__/schemas.cpython-311.pyc and b/app/__pycache__/schemas.cpython-311.pyc differ diff --git a/app/api/__pycache__/tasks.cpython-311.pyc b/app/api/__pycache__/tasks.cpython-311.pyc index 705e36e..c97bc57 100644 Binary files a/app/api/__pycache__/tasks.cpython-311.pyc and b/app/api/__pycache__/tasks.cpython-311.pyc differ diff --git a/app/api/tasks.py b/app/api/tasks.py index 4c816be..72be69c 100644 --- a/app/api/tasks.py +++ b/app/api/tasks.py @@ -2,11 +2,13 @@ 定时任务接口 - 创建/启动/停止/删除/列表 """ import logging +from typing import Optional from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from app.database import get_db +from app.models import ScheduledTask from app.schemas import ( CreateTaskRequest, TaskInfo, @@ -35,18 +37,23 @@ router = APIRouter(prefix="/tasks", tags=["定时任务"]) def create_new_task(req: CreateTaskRequest, db: Session = Depends(get_db)): """ 创建并启动一个定时采集任务。 - 输入品种合约和轮询时长,自动开始定时获取数据。 + 输入品种合约和轮询时长,自动开始定时获取数据。 """ + # 将periods数组转为逗号分隔的字符串 + periods_str = ",".join(req.periods) if isinstance(req.periods, list) else req.periods + task = create_task( db=db, symbol=req.symbol, data_type=req.data_type, - periods=req.periods, + periods=periods_str, interval_seconds=req.interval_seconds, + task_type=req.task_type, + run_time=req.run_time, ) # 注册到调度器 - job_id = add_job(task.id, task.interval_seconds) + job_id = add_job(task.id, task.interval_seconds, task.task_type, task.run_time) task.job_id = job_id db.commit() db.refresh(task) @@ -56,30 +63,63 @@ def create_new_task(req: CreateTaskRequest, db: Session = Depends(get_db)): @router.get("", response_model=TaskListResponse) def list_all_tasks(db: Session = Depends(get_db)): - """列出所有定时任务""" - tasks = list_tasks(db) + """列出所有定时任务(未完成的)""" + tasks = db.query(ScheduledTask).filter( + ScheduledTask.is_finished == False + ).order_by(ScheduledTask.created_at.desc()).all() job_status = get_all_jobs() task_infos = [] for t in tasks: - running = is_job_running(t.id) if t.enabled else False - task_infos.append(TaskInfo( - id=t.id, - symbol=t.symbol, - data_type=t.data_type, - periods=t.periods.split(",") if t.periods else [], - interval_seconds=t.interval_seconds, - enabled=t.enabled, - running=running, - last_run=t.last_run.isoformat() if t.last_run else None, - last_status=t.last_status, - created_at=t.created_at.isoformat(), - updated_at=t.updated_at.isoformat(), - )) + job_id = f"task_{t.id}" + job_info = job_status.get(job_id) + + task_infos.append(_to_task_info(t, job_info)) return TaskListResponse(tasks=task_infos, total=len(task_infos)) +@router.get("/history", response_model=TaskListResponse) +def list_finished_tasks(db: Session = Depends(get_db)): + """列出已完成的历史任务""" + tasks = db.query(ScheduledTask).filter( + ScheduledTask.is_finished == True + ).order_by(ScheduledTask.updated_at.desc()).all() + + task_infos = [] + for t in tasks: + task_infos.append(_to_task_info(t, None)) + + return TaskListResponse(tasks=task_infos, total=len(task_infos)) + + +@router.post("/{task_id}/rerun", response_model=TaskInfo) +def rerun_task(task_id: int, db: Session = Depends(get_db)): + """重新执行已完成的任务""" + task = get_task(db, task_id) + if not task: + raise HTTPException(status_code=404, detail=f"任务 {task_id} 不存在") + + if not task.is_finished: + raise HTTPException(status_code=400, detail=f"任务 {task_id} 尚未完成,无法重新执行") + + # 重置任务状态 + task.is_finished = False + task.enabled = True + task.last_run = None + task.last_status = None + db.commit() + db.refresh(task) + + # 重新注册到调度器 + job_id = add_job(task.id, task.interval_seconds, task.task_type, task.run_time) + task.job_id = job_id + db.commit() + db.refresh(task) + + return _to_task_info(task) + + @router.post("/{task_id}/stop", response_model=TaskInfo) def stop_task(task_id: int, db: Session = Depends(get_db)): """停止定时任务(从调度器移除,但保留配置)""" @@ -101,7 +141,7 @@ def start_task(task_id: int, db: Session = Depends(get_db)): raise HTTPException(status_code=404, detail=f"任务 {task_id} 不存在") enable_task(db, task_id) - add_job(task.id, task.interval_seconds) + add_job(task.id, task.interval_seconds, task.task_type, task.run_time) db.refresh(task) return _to_task_info(task) @@ -139,23 +179,29 @@ def update_interval( # 如果任务正在运行,更新调度器 if task.enabled and is_job_running(task_id): remove_job(task_id) - add_job(task.id, task.interval_seconds) + add_job(task.id, task.interval_seconds, task.task_type, task.run_time) return _to_task_info(task) -def _to_task_info(task) -> TaskInfo: +def _to_task_info(task, job_info: Optional[dict] = None) -> TaskInfo: """ORM -> Pydantic""" + next_run = None + if job_info and job_info.get("next_run_time"): + next_run = job_info["next_run_time"] + return TaskInfo( id=task.id, symbol=task.symbol, data_type=task.data_type, periods=task.periods.split(",") if task.periods else [], interval_seconds=task.interval_seconds, + task_type=task.task_type if hasattr(task, 'task_type') else 'interval', enabled=task.enabled, running=is_job_running(task.id), last_run=task.last_run.isoformat() if task.last_run else None, last_status=task.last_status, + next_run=next_run, created_at=task.created_at.isoformat(), updated_at=task.updated_at.isoformat(), ) diff --git a/app/models.py b/app/models.py index 1e0fa98..d3a9a0b 100644 --- a/app/models.py +++ b/app/models.py @@ -37,7 +37,10 @@ class ScheduledTask(Base): data_type = Column(String(16), nullable=False, default="futures", comment="数据类型") periods = Column(String(256), nullable=False, comment="周期列表(逗号分隔), 如 5min,15min,60min") interval_seconds = Column(Integer, nullable=False, default=300, comment="轮询间隔(秒)") + task_type = Column(String(16), nullable=False, default="interval", comment="任务类型: interval, daily, once") + run_time = Column(String(8), nullable=True, comment="执行时间,格式 HH:MM") enabled = Column(Boolean, nullable=False, default=True, comment="是否启用") + is_finished = Column(Boolean, nullable=False, default=False, comment="是否已完成(仅一次任务执行完成后为True)") job_id = Column(String(64), nullable=True, unique=True, comment="APScheduler job_id") last_run = Column(DateTime, nullable=True, comment="最后执行时间") last_status = Column(String(16), nullable=True, comment="最后状态: success/failed") diff --git a/app/schemas.py b/app/schemas.py index 6545c4b..a868980 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -70,9 +70,9 @@ class CreateTaskRequest(BaseModel): """创建定时任务请求""" symbol: str = Field(..., description="品种合约代码") data_type: str = Field(default="futures", description="数据类型") - periods: List[str] = Field( - default=["5min", "15min", "30min", "60min", "daily"], - description="需要定时获取的周期" + periods: str = Field( + default="5min,15min,30min,60min,daily", + description="需要定时获取的周期,逗号分隔" ) interval_seconds: int = Field( default=300, @@ -80,6 +80,8 @@ class CreateTaskRequest(BaseModel): le=86400, description="轮询间隔(秒),范围 30~86400" ) + task_type: str = Field(default="interval", description="任务类型: interval, daily, once") + run_time: Optional[str] = Field(default=None, description="执行时间,格式 HH:MM") class TaskInfo(BaseModel): @@ -89,10 +91,12 @@ class TaskInfo(BaseModel): data_type: str periods: List[str] interval_seconds: int + task_type: str = Field(default="interval", description="任务类型: interval, daily, once") enabled: bool running: bool = Field(description="当前是否正在运行") last_run: Optional[str] = None last_status: Optional[str] = None + next_run: Optional[str] = Field(default=None, description="下次执行时间") created_at: str updated_at: str diff --git a/app/services/__pycache__/cache.cpython-311.pyc b/app/services/__pycache__/cache.cpython-311.pyc index 4c85702..8c19002 100644 Binary files a/app/services/__pycache__/cache.cpython-311.pyc and b/app/services/__pycache__/cache.cpython-311.pyc differ diff --git a/app/services/__pycache__/scheduler.cpython-311.pyc b/app/services/__pycache__/scheduler.cpython-311.pyc index 0bb9be2..5e1f5a2 100644 Binary files a/app/services/__pycache__/scheduler.cpython-311.pyc and b/app/services/__pycache__/scheduler.cpython-311.pyc differ diff --git a/app/services/cache.py b/app/services/cache.py index de9eea8..423a4c0 100644 --- a/app/services/cache.py +++ b/app/services/cache.py @@ -177,16 +177,20 @@ def create_task( db: Session, symbol: str, data_type: str, - periods: List[str], + periods: str, interval_seconds: int, + task_type: str = "interval", + run_time: Optional[str] = None, ) -> ScheduledTask: """创建定时任务配置""" existing = db.query(ScheduledTask).filter_by( symbol=symbol, data_type=data_type ).first() if existing: - existing.periods = ",".join(periods) + existing.periods = periods existing.interval_seconds = interval_seconds + existing.task_type = task_type + existing.run_time = run_time existing.enabled = True existing.updated_at = datetime.now() db.commit() @@ -196,8 +200,10 @@ def create_task( task = ScheduledTask( symbol=symbol, data_type=data_type, - periods=",".join(periods), + periods=periods, interval_seconds=interval_seconds, + task_type=task_type, + run_time=run_time, enabled=True, ) db.add(task) diff --git a/app/services/scheduler.py b/app/services/scheduler.py index a451834..a385ee5 100644 --- a/app/services/scheduler.py +++ b/app/services/scheduler.py @@ -2,11 +2,14 @@ 调度服务 - APScheduler 管理定时采集任务 """ import logging -from datetime import datetime +import re +from datetime import datetime, timedelta from typing import Dict, Optional from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.date import DateTrigger from apscheduler.executors.pool import ThreadPoolExecutor from sqlalchemy.orm import Session @@ -54,6 +57,14 @@ def job_handler(task_id: int): save_market_data(db, task.symbol, result) update_task_status(db, task_id, "success") logger.info(f"[定时任务] {task.symbol} 采集成功") + + # 如果是一次性任务,标记为已完成并从调度器移除 + if task.task_type == "once": + task.is_finished = True + task.enabled = False + db.commit() + remove_job(task_id) + logger.info(f"[定时任务] {task.symbol} 一次性任务已完成") else: update_task_status(db, task_id, "failed") logger.error(f"[定时任务] {task.symbol} 采集失败: {result.get('error')}") @@ -82,10 +93,16 @@ def stop_scheduler(): logger.info("调度器已停止") -def add_job(task_id: int, interval_seconds: int) -> str: +def add_job(task_id: int, interval_seconds: int, task_type: str = "interval", run_time: Optional[str] = None) -> str: """ 添加定时任务到调度器。 + Args: + task_id: 任务ID + interval_seconds: 间隔秒数 + task_type: 任务类型 (interval, daily, once) + run_time: 执行时间,格式 HH:MM (用于daily和once类型) + Returns: job_id """ @@ -95,15 +112,50 @@ def add_job(task_id: int, interval_seconds: int) -> str: if scheduler.get_job(job_id): scheduler.remove_job(job_id) + # 根据任务类型选择trigger + try: + if task_type == "daily" and run_time: + # 每天定时执行 - 验证run_time格式 + if not re.match(r'^\d{2}:\d{2}$', run_time): + raise ValueError(f"run_time格式错误: {run_time}, 应为 HH:MM") + + hour, minute = map(int, run_time.split(":")) + if not (0 <= hour <= 23 and 0 <= minute <= 59): + raise ValueError(f"run_time值无效: {run_time}, 小时应为0-23, 分钟应为0-59") + + trigger = CronTrigger(hour=hour, minute=minute, timezone="Asia/Shanghai") + + elif task_type == "once" and run_time: + # 仅执行一次 - 验证run_time格式 + if not re.match(r'^\d{2}:\d{2}$', run_time): + raise ValueError(f"run_time格式错误: {run_time}, 应为 HH:MM") + + hour, minute = map(int, run_time.split(":")) + if not (0 <= hour <= 23 and 0 <= minute <= 59): + raise ValueError(f"run_time值无效: {run_time}, 小时应为0-23, 分钟应为0-59") + + now = datetime.now() + run_dt = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if run_dt <= now: + # 如果时间已过,设置为明天(使用timedelta避免月末日期计算错误) + run_dt = run_dt + timedelta(days=1) + trigger = DateTrigger(run_date=run_dt, timezone="Asia/Shanghai") + else: + # 默认:间隔执行 + trigger = IntervalTrigger(seconds=interval_seconds) + except ValueError as e: + logger.error(f"任务 {task_id} 时间配置错误: {e}, 使用默认间隔触发器") + trigger = IntervalTrigger(seconds=interval_seconds) + scheduler.add_job( func=job_handler, - trigger=IntervalTrigger(seconds=interval_seconds), + trigger=trigger, args=[task_id], id=job_id, name=f"auto_collect_{task_id}", replace_existing=True, ) - logger.info(f"已添加定时任务: job_id={job_id}, interval={interval_seconds}s") + logger.info(f"已添加定时任务: job_id={job_id}, type={task_type}, trigger={trigger}") return job_id diff --git a/app/static/index.html b/app/static/index.html index e66d5ba..db74328 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -924,29 +924,16 @@
-
批量创建定时任务
-
为配置中的所有品种自动创建定时采集任务
+
定时任务管理
+
创建、管理和监控定时数据采集任务
-
-
-
- - -
-
- - -
-
- - -
-
- +
@@ -956,15 +943,53 @@
任务列表
- + +
+
+
+
+ +

暂无定时任务

+
+
+
+
+ + + + + + + + + + + +