django worker被强制关机后,如何重启celery中正在运行的任务和挂起的任务?

z4bn682m  于 2023-08-08  发布在  Go
关注(0)|答案(2)|浏览(196)

目前,我正在celery 运行任务,需要10到15分钟才能完成,但问题是我如何重新启动任务,目前正在运行的工人,也是一个目前没有运行,但等待任务运行,而我强行停止工人或在情况下,我的服务器崩溃或停止。现在发生的是,如果我再次启动celery,它不会启动最后一个正在运行的任务或剩余的任务。

l2osamch

l2osamch1#

你可以做的一件事就是在任务上启用acks_late。另外,可能值得阅读他们关于acks late和retry的FAQ部分。

@app.task(acks_late=True)
def task(*args, **kwargs):
    ...

字符串

igetnqfo

igetnqfo2#

在我的例子中,我必须设置acks_latereject_on_worker_lost

@app.task(bind=True, base=AbortableTask, acks_late=True, reject_on_worker_lost=True)
def my_task(self, experiment_pk: int):
   # Your code...

字符串
celery.py文件中,我对this gist进行了修改,它检索了所有考虑visibility_timeout参数的挂起任务(这使得任务在worker重启后等待而不被处理):

import os
from celery import Celery
from celery.signals import worker_init

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_proj.settings")

app = Celery("my_proj")

app.config_from_object("django.conf:settings", namespace="CELERY")

app.autodiscover_tasks()

# Gets the max between all the parameters of timeout in the tasks
max_timeout = a_vale  # This value must be bigger than the maximum soft timeout set for a task to prevent an infinity loop
app.conf.broker_transport_options = {'visibility_timeout': max_timeout + 60}  # 60 seconds of margin

def restore_all_unacknowledged_messages():
    """
    Restores all the unacknowledged messages in the queue.
    Taken from https://gist.github.com/mlavin/6671079
    """
    conn = app.connection(transport_options={'visibility_timeout': 0})
    qos = conn.channel().qos
    qos.restore_visible()
    print('Unacknowledged messages restored')

@worker_init.connect
def configure(sender=None, conf=None, **kwargs):
    restore_all_unacknowledged_messages()

相关问题