python 如何使用APScheduler设置作业持续时间的限制?

a64a0gku  于 2022-12-10  发布在  Python
关注(0)|答案(2)|浏览(446)

我设置了调度程序“max_instances=10”。可以有10个作业同时运行。有时一些作业阻塞,它会挂在那里。当超过10个作业阻塞时,“跳过:已达到最大运行示例数(10)"。APScheduler是否有办法设置作业持续时间的最大值。如果作业运行时间超过最大值,它将被终止。如果没有办法,我该怎么办?

8yoxcaq7

8yoxcaq71#

APScheduler无法设置作业的最大运行时间。这主要是因为用于PoolExecutors的基础并发.futures包不支持这样的功能。如果缺少适当的API,则可以终止子进程,APScheduler必须获得专门的执行器来支持此功能。更不用说增加了允许超时的作业API。这是下一个主要版本要考虑的事情。
问题是,您希望如何处理仍在运行作业的线程?由于线程不能被强制终止,因此唯一的选择是让它继续运行,但这样仍然会使线程处于忙碌状态。

sulc1iza

sulc1iza2#

我为下面的ThreadPoolExecutor做了一个穷人的实现。

  • 将此视为您自己研究的非权威性示例
  • 使用 Package 实际作业函数的控制器函数装饰作业
  • 有一种方法可以调用检查函数(从另一个线程)来查看是否有任何任务超过了它的时间
  • 陷入火海而不是试图从困境中恢复
  • 因为我们希望有硬崩溃(任何情况都是不可恢复的),所以我们还在任何作业引发异常的情况下展开调度程序

这段代码包含了一些我懒得清理的内部上下文方式。
调度程序 Package 类:

"""Chain specific tasks scheduler."""
import logging
import datetime
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List

from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler

from dex_ohlcv.processcontext import ProcessContext
from dex_ohlcv.timer import timed_task

logger = logging.getLogger(__name__)

class TaskTimeLimitExceeded(Exception):
    """One of background tasks have been busy for too long."""

@dataclass
class TaskControl:
    """Metadata about running backround tasks.

    Mostly used to diagnose/kill backgrond scheduler.
    """

    #: Task name
    name: str

    #: When started (UTC)
    started_at: datetime.datetime

    #: How long this task should run (UTC)
    max_duration: datetime.timedelta

class OracleScheduler:
    """Thread pool scheduler ask runner using APSSCheuler.

    One oracle can have two different schedulers
    """

    def __init__(self, context: ProcessContext):
        self.context = context
        self.died = False
        self.thread_count = 10
        self.schedular = self._create_scheduler()

        # Keep the list of active tasks in the memory so we can
        # die if any of tasks exceeds it maximum duration.
        # Tasks are keyed by the thread id of its runner.
        self.active_tasks: Dict[int, TaskControl] = {}

        assert context.chain_id

    def is_dead(self) -> bool:
        """Check if the scheduler has died because of an exception in tasks"""
        return self.died

    def _create_scheduler(self):
        """Build APSScheduler instance"""

        executors = {
            'default': ThreadPoolExecutor(self.thread_count),
        }

        def listen_error(event):
            if event.exception:
                logger.info("Scheduled task received exception. event: %s, execption: %s", event, event.exception)
            else:
                logger.error("Should not happen")

        scheduler = BackgroundScheduler(executors=executors, timezone=datetime.timezone.utc)
        scheduler.add_listener(listen_error, EVENT_JOB_ERROR)
        self.scheduler = scheduler
        return scheduler

    def die(self, exc: Exception):
        """"Shutdown the scheduler and mark an clean exit.

        Used by task control to abort in the case any tasks raise an exception.
        """
        logger.error("The background scheduler is dying because one of the tasks received exception: %s", exc)
        logger.exception(exc)
        self.scheduler.shutdown(wait=False)
        self.died = True

    def job(self,
            interval: datetime.timedelta,
            start_time=datetime.datetime(1970, 1, 1),
            max_instances=1,
            max_duration=datetime.timedelta(hours=2),
            ) -> Callable:
        """Decorator to create managed background jobs.

        - Decorated function takes one argument :py:class:`ProcessContext`:
          `task(context: ProcessContext)`.

        - All tasks are automatically tracked with :py:func:`timed_task`.

        - If any of the jobs fail with an excetion, shutdown the whole oracle by
          checking :py:meth:`is_dead` in the duty cycle loop.

        :param interval:
            How often the task should fire.

        :param start_time:
            Sets the start of the interval cycles. Passed to APScheduler add_job.

        :param max_instance:
            Limit the number of concurrent tasks. Passed to APScheduler add_job

        :parma max_duration:
            Max duration for this task.

            If exceeded, the background scheduler can unwind
            in :py:meth:`check_control_and_die`.
        """

        assert isinstance(interval, datetime.timedelta)

        context = self.context

        # https://stackoverflow.com/a/5929165/315168
        def _outer(func):
            def _inner():

                task_id = threading.get_native_id()
                name = func.__name__

                task_control = TaskControl(
                    name,
                    datetime.datetime.utcnow(),
                    max_duration,
                )

                self.active_tasks[task_id] = task_control
                try:

                    started = datetime.datetime.utcnow()
                    with timed_task(f"oracle.scheduled_task.{name}",
                                    chain_id=context.chain_id.name,
                                    task_type="oracle_scheduled_task"):
                        res = func(context)

                    # Warn if tasks start to take too long
                    duration = datetime.datetime.utcnow() - started
                    if duration > interval:
                        logger.warning("Task %s took %s, more than interval %s", name, duration, interval)

                    return res
                except Exception as e:
                    self.die(e)
                finally:
                    del self.active_tasks[task_id]

            self.scheduler.add_job(
                _inner,
                'interval',
                seconds=interval.total_seconds(),
                start_date=start_time,
                max_instances=max_instances,
            )

        return _outer

    def get_exceeded_tasks(self) -> List[TaskControl]:
        """Check if any of the jobs have exceeded their time limit."""
        now_ = datetime.datetime.utcnow()
        return [tc for tc in self.active_tasks.values() if now_ > tc.started_at + tc.max_duration]

    def check_control_and_die(self):
        """Check if any of the tasks have exceeded their time limit and die

        Because we cannot abort thread code in CPython, just kill all other tasks
        and hope the stuck ask dies as well

        :raise TaskTimeLimitExceeded:
            If any of background tasks have exceeded
            their allocated life time.
        """
        stuck = self.get_exceeded_tasks()

        if not stuck:
            return

        logger.critical("Background tasks exceeded their time limits. Likely hung?")

        self.stop()

        # If we have multiple stuck tasks just raise for the first one
        error_msg = None
        for tc in stuck:
            exceeded = (datetime.datetime.utcnow() - tc.started_at) - tc.max_duration
            error_msg = f"Task {tc.name} is stuck. Started {tc.started_at}, limit {tc.max_duration}, exceeded time limit with {exceeded}"
            logger.error(error_msg)

        raise TaskTimeLimitExceeded(error_msg)

    def start(self):
        """Start the scheduler.

        Runs on a background thread and returns instantly.
        """
        jobs = self.scheduler.get_jobs()
        logger.info("Background scheduler starting. We have %d jobs", len(jobs))
        self.scheduler.start()

    def stop(self, wait=False):
        """Stop the scheduler."""
        self.scheduler.shutdown(wait=wait)

示例测试:

"""Oracle background scheduled tasks tests."""

import datetime
import time

import pytest

from dex_ohlcv.scheduler.scheduler import OracleScheduler, TaskTimeLimitExceeded

def test_job_success(test_context):
    """Background jobs success."""

    succeed = False

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1))
    def test_job(context: ProcessContext):
        nonlocal succeed
        succeed = True

    scheduler.start()
    time.sleep(3)
    scheduler.stop()

    assert not scheduler.is_dead()
    assert succeed is True

def test_job_crashed(test_context):
    """Background jobs crashed."""

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1))
    def test_job(context: ProcessContext):
        raise RuntimeError("Oh no")

    scheduler.start()
    time.sleep(3)
    assert scheduler.is_dead()

def test_job_not_exceeded_time_limit(test_context):
    """Background jobs stay witin time limits.."""

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=30))
    def test_job(context: ProcessContext):
        time.sleep(4)

    scheduler.start()
    time.sleep(1)
    assert not scheduler.get_exceeded_tasks()

def test_job_time_limit_exceeded(test_context):
    """Background jobs exceeds its time limit.."""

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=0))
    def test_job(context: ProcessContext):
        time.sleep(4)

    scheduler.start()
    time.sleep(1)

    with pytest.raises(TaskTimeLimitExceeded):
        scheduler.check_control_and_die()

相关问题