python Celery的过期选项不起作用

smtd7mpg  于 2022-12-17  发布在  Python
关注(0)|答案(2)|浏览(184)

我正在使用Celery,并且尝试使用CELERYBEAT_SCHEDULER执行一个周期性任务。

CELERY_TIMEZONE = 'Europe/Kiev'
CELERYBEAT_SCHEDULE = {
    'run-task-every-5-seconds': {
        'task': 'tasks.run_every_five_seconds',
        'schedule': timedelta(seconds=5),
        'options': {
            'expires': 10,
        }
    },
}

# the task
@app.task()
def run_every_five_seconds():
   return '5 seconds passed'

当用celery -A celery_app beat运行节拍时,任务似乎没有过期。然后我读到节拍可能有一些问题,所以它没有考虑过期选项。
然后我尝试执行一个任务,所以它被手动调用。

@app.task()
def print_hello():
    while True:
        print datetime.datetime.now()
        sleep(1)

我这样称呼这个任务:

print_hello.apply_async(args=[], expires=5)

工作者的控制台告诉我,我的任务将到期,但它并没有到期,而是无限执行。

Received task: tasks.print_hello[05ee0175-cf3a-492b-9601-1450eaaf8ef7] expires:[2016-01-15 00:08:03.707062+02:00]

我做错什么了吗?

dgjrabp2

dgjrabp21#

我想你理解错了expires的论证。
文档中写道:“任务在到期时间后将不执行.”ref。表示如果到期时间已过,则不会开始执行。如果已经开始执行,则将运行到完成。
您的配置每5秒向任务队列添加一个任务。如果从任务添加到任务队列起10秒内未开始执行,则会丢弃该任务。但是,由于有空闲的celery 工作进程可用,因此会立即执行该任务。
您的代码示例添加了一个任务,如果在5秒内未开始执行,则将放弃该任务。
要获得所需的功能,可以将'expires': 10,替换为'expires': datetime.datetime.now() + timedelta(seconds=10),,这会将expires设置为绝对时间。

vfh0ocws

vfh0ocws2#

为了补充前面的答案,expire参数的用途在以下位置捕获:https://github.com/celery/celery/issues/591

让我举个例子

假设您计划每5分钟执行一个任务。因此,celery beat每5分钟将任务添加到任务队列中。现在,由于某种原因,如果工作进程不工作,它将不会从任务队列中选取任何任务。任务队列随着时间的推移而增长,其中包含许多重复性任务。一旦工作进程启动,它就会有大量积压工作,并将时间浪费在执行旧任务上。

解决方案?expires参数。

现在每个任务都有,比方说1分钟的过期时间。所以当工作者再次在线时,它会丢弃所有过期的旧任务,只处理最新的未过期任务。因此,工作者不必在旧的重复性任务上浪费时间。

最佳实践

当您不知道如何设置过期时间时,最好将其设置为等于计划/间隔。

相关问题