"""Chain specific tasks scheduler."""
import logging
import datetime
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List
from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from dex_ohlcv.processcontext import ProcessContext
from dex_ohlcv.timer import timed_task
logger = logging.getLogger(__name__)
class TaskTimeLimitExceeded(Exception):
"""One of background tasks have been busy for too long."""
@dataclass
class TaskControl:
"""Metadata about running backround tasks.
Mostly used to diagnose/kill backgrond scheduler.
"""
#: Task name
name: str
#: When started (UTC)
started_at: datetime.datetime
#: How long this task should run (UTC)
max_duration: datetime.timedelta
class OracleScheduler:
"""Thread pool scheduler ask runner using APSSCheuler.
One oracle can have two different schedulers
"""
def __init__(self, context: ProcessContext):
self.context = context
self.died = False
self.thread_count = 10
self.schedular = self._create_scheduler()
# Keep the list of active tasks in the memory so we can
# die if any of tasks exceeds it maximum duration.
# Tasks are keyed by the thread id of its runner.
self.active_tasks: Dict[int, TaskControl] = {}
assert context.chain_id
def is_dead(self) -> bool:
"""Check if the scheduler has died because of an exception in tasks"""
return self.died
def _create_scheduler(self):
"""Build APSScheduler instance"""
executors = {
'default': ThreadPoolExecutor(self.thread_count),
}
def listen_error(event):
if event.exception:
logger.info("Scheduled task received exception. event: %s, execption: %s", event, event.exception)
else:
logger.error("Should not happen")
scheduler = BackgroundScheduler(executors=executors, timezone=datetime.timezone.utc)
scheduler.add_listener(listen_error, EVENT_JOB_ERROR)
self.scheduler = scheduler
return scheduler
def die(self, exc: Exception):
""""Shutdown the scheduler and mark an clean exit.
Used by task control to abort in the case any tasks raise an exception.
"""
logger.error("The background scheduler is dying because one of the tasks received exception: %s", exc)
logger.exception(exc)
self.scheduler.shutdown(wait=False)
self.died = True
def job(self,
interval: datetime.timedelta,
start_time=datetime.datetime(1970, 1, 1),
max_instances=1,
max_duration=datetime.timedelta(hours=2),
) -> Callable:
"""Decorator to create managed background jobs.
- Decorated function takes one argument :py:class:`ProcessContext`:
`task(context: ProcessContext)`.
- All tasks are automatically tracked with :py:func:`timed_task`.
- If any of the jobs fail with an excetion, shutdown the whole oracle by
checking :py:meth:`is_dead` in the duty cycle loop.
:param interval:
How often the task should fire.
:param start_time:
Sets the start of the interval cycles. Passed to APScheduler add_job.
:param max_instance:
Limit the number of concurrent tasks. Passed to APScheduler add_job
:parma max_duration:
Max duration for this task.
If exceeded, the background scheduler can unwind
in :py:meth:`check_control_and_die`.
"""
assert isinstance(interval, datetime.timedelta)
context = self.context
# https://stackoverflow.com/a/5929165/315168
def _outer(func):
def _inner():
task_id = threading.get_native_id()
name = func.__name__
task_control = TaskControl(
name,
datetime.datetime.utcnow(),
max_duration,
)
self.active_tasks[task_id] = task_control
try:
started = datetime.datetime.utcnow()
with timed_task(f"oracle.scheduled_task.{name}",
chain_id=context.chain_id.name,
task_type="oracle_scheduled_task"):
res = func(context)
# Warn if tasks start to take too long
duration = datetime.datetime.utcnow() - started
if duration > interval:
logger.warning("Task %s took %s, more than interval %s", name, duration, interval)
return res
except Exception as e:
self.die(e)
finally:
del self.active_tasks[task_id]
self.scheduler.add_job(
_inner,
'interval',
seconds=interval.total_seconds(),
start_date=start_time,
max_instances=max_instances,
)
return _outer
def get_exceeded_tasks(self) -> List[TaskControl]:
"""Check if any of the jobs have exceeded their time limit."""
now_ = datetime.datetime.utcnow()
return [tc for tc in self.active_tasks.values() if now_ > tc.started_at + tc.max_duration]
def check_control_and_die(self):
"""Check if any of the tasks have exceeded their time limit and die
Because we cannot abort thread code in CPython, just kill all other tasks
and hope the stuck ask dies as well
:raise TaskTimeLimitExceeded:
If any of background tasks have exceeded
their allocated life time.
"""
stuck = self.get_exceeded_tasks()
if not stuck:
return
logger.critical("Background tasks exceeded their time limits. Likely hung?")
self.stop()
# If we have multiple stuck tasks just raise for the first one
error_msg = None
for tc in stuck:
exceeded = (datetime.datetime.utcnow() - tc.started_at) - tc.max_duration
error_msg = f"Task {tc.name} is stuck. Started {tc.started_at}, limit {tc.max_duration}, exceeded time limit with {exceeded}"
logger.error(error_msg)
raise TaskTimeLimitExceeded(error_msg)
def start(self):
"""Start the scheduler.
Runs on a background thread and returns instantly.
"""
jobs = self.scheduler.get_jobs()
logger.info("Background scheduler starting. We have %d jobs", len(jobs))
self.scheduler.start()
def stop(self, wait=False):
"""Stop the scheduler."""
self.scheduler.shutdown(wait=wait)
示例测试:
"""Oracle background scheduled tasks tests."""
import datetime
import time
import pytest
from dex_ohlcv.scheduler.scheduler import OracleScheduler, TaskTimeLimitExceeded
def test_job_success(test_context):
"""Background jobs success."""
succeed = False
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1))
def test_job(context: ProcessContext):
nonlocal succeed
succeed = True
scheduler.start()
time.sleep(3)
scheduler.stop()
assert not scheduler.is_dead()
assert succeed is True
def test_job_crashed(test_context):
"""Background jobs crashed."""
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1))
def test_job(context: ProcessContext):
raise RuntimeError("Oh no")
scheduler.start()
time.sleep(3)
assert scheduler.is_dead()
def test_job_not_exceeded_time_limit(test_context):
"""Background jobs stay witin time limits.."""
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=30))
def test_job(context: ProcessContext):
time.sleep(4)
scheduler.start()
time.sleep(1)
assert not scheduler.get_exceeded_tasks()
def test_job_time_limit_exceeded(test_context):
"""Background jobs exceeds its time limit.."""
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=0))
def test_job(context: ProcessContext):
time.sleep(4)
scheduler.start()
time.sleep(1)
with pytest.raises(TaskTimeLimitExceeded):
scheduler.check_control_and_die()
2条答案
按热度按时间8yoxcaq71#
APScheduler无法设置作业的最大运行时间。这主要是因为用于PoolExecutors的基础并发.futures包不支持这样的功能。如果缺少适当的API,则可以终止子进程,APScheduler必须获得专门的执行器来支持此功能。更不用说增加了允许超时的作业API。这是下一个主要版本要考虑的事情。
问题是,您希望如何处理仍在运行作业的线程?由于线程不能被强制终止,因此唯一的选择是让它继续运行,但这样仍然会使线程处于忙碌状态。
sulc1iza2#
我为下面的
ThreadPoolExecutor
做了一个穷人的实现。这段代码包含了一些我懒得清理的内部上下文方式。
调度程序 Package 类:
示例测试: