import os
from celery import Celery
from celery.signals import worker_init
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_proj.settings")
app = Celery("my_proj")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# Gets the max between all the parameters of timeout in the tasks
max_timeout = a_vale # This value must be bigger than the maximum soft timeout set for a task to prevent an infinity loop
app.conf.broker_transport_options = {'visibility_timeout': max_timeout + 60} # 60 seconds of margin
def restore_all_unacknowledged_messages():
"""
Restores all the unacknowledged messages in the queue.
Taken from https://gist.github.com/mlavin/6671079
"""
conn = app.connection(transport_options={'visibility_timeout': 0})
qos = conn.channel().qos
qos.restore_visible()
print('Unacknowledged messages restored')
@worker_init.connect
def configure(sender=None, conf=None, **kwargs):
restore_all_unacknowledged_messages()
2条答案
按热度按时间l2osamch1#
你可以做的一件事就是在任务上启用
acks_late
。另外,可能值得阅读他们关于acks late和retry的FAQ部分。字符串
igetnqfo2#
在我的例子中,我必须设置
acks_late
和reject_on_worker_lost
:字符串
在
celery.py
文件中,我对this gist进行了修改,它检索了所有考虑visibility_timeout
参数的挂起任务(这使得任务在worker重启后等待而不被处理):型