我正在使用Celery,并且尝试使用CELERYBEAT_SCHEDULER
执行一个周期性任务。
CELERY_TIMEZONE = 'Europe/Kiev'
CELERYBEAT_SCHEDULE = {
'run-task-every-5-seconds': {
'task': 'tasks.run_every_five_seconds',
'schedule': timedelta(seconds=5),
'options': {
'expires': 10,
}
},
}
# the task
@app.task()
def run_every_five_seconds():
return '5 seconds passed'
当用celery -A celery_app beat
运行节拍时,任务似乎没有过期。然后我读到节拍可能有一些问题,所以它没有考虑过期选项。
然后我尝试执行一个任务,所以它被手动调用。
@app.task()
def print_hello():
while True:
print datetime.datetime.now()
sleep(1)
我这样称呼这个任务:
print_hello.apply_async(args=[], expires=5)
工作者的控制台告诉我,我的任务将到期,但它并没有到期,而是无限执行。
Received task: tasks.print_hello[05ee0175-cf3a-492b-9601-1450eaaf8ef7] expires:[2016-01-15 00:08:03.707062+02:00]
我做错什么了吗?
2条答案
按热度按时间dgjrabp21#
我想你理解错了
expires
的论证。文档中写道:“任务在到期时间后将不执行.”ref。表示如果到期时间已过,则不会开始执行。如果已经开始执行,则将运行到完成。
您的配置每5秒向任务队列添加一个任务。如果从任务添加到任务队列起10秒内未开始执行,则会丢弃该任务。但是,由于有空闲的celery 工作进程可用,因此会立即执行该任务。
您的代码示例添加了一个任务,如果在5秒内未开始执行,则将放弃该任务。
要获得所需的功能,可以将
'expires': 10,
替换为'expires': datetime.datetime.now() + timedelta(seconds=10),
,这会将expires
设置为绝对时间。vfh0ocws2#
为了补充前面的答案,expire参数的用途在以下位置捕获:https://github.com/celery/celery/issues/591
让我举个例子
假设您计划每5分钟执行一个任务。因此,celery beat每5分钟将任务添加到任务队列中。现在,由于某种原因,如果工作进程不工作,它将不会从任务队列中选取任何任务。任务队列随着时间的推移而增长,其中包含许多重复性任务。一旦工作进程启动,它就会有大量积压工作,并将时间浪费在执行旧任务上。
解决方案?
expires
参数。现在每个任务都有,比方说1分钟的过期时间。所以当工作者再次在线时,它会丢弃所有过期的旧任务,只处理最新的未过期任务。因此,工作者不必在旧的重复性任务上浪费时间。
最佳实践
当您不知道如何设置过期时间时,最好将其设置为等于计划/间隔。