python 通知celery任务工作进程关闭

evrscar2  于 2023-09-29  发布在  Python
关注(0)|答案(2)|浏览(219)

我使用celery 2.4.1和python 2.6,rabbitmq后端和django。我希望我的任务能够妥善清理,如果工人关闭。据我所知,你不能提供一个任务析构函数,所以我尝试挂钩到worker_shutdown信号。
注意:AbortableTask只适用于数据库后端,所以我不能使用它。

  1. from celery.signals import worker_shutdown
  2. @task
  3. def mytask(*args)
  4. obj = DoStuff()
  5. def shutdown_hook(*args):
  6. print "Worker shutting down"
  7. # cleanup nicely
  8. obj.stop()
  9. worker_shutdown.connect(shutdown_hook)
  10. # blocking call that monitors a network connection
  11. obj.stuff()

但是,关闭钩子永远不会被调用。Ctrl-C 'ing工人不会杀死任务,我必须从shell手动杀死它。
那么,如果这不是正确的方法,我如何允许任务优雅地关闭呢?

jchrr9hc

jchrr9hc1#

worker_shutdown仅由MainProcess发送,而不是子池工作进程。所有worker_*信号except for worker_process_init,参考MainProcess
但是,关闭钩子永远不会被调用。Ctrl-C 'ing工人不会杀死任务,我必须从shell手动杀死它。
worker在正常(热)关机状态下从不终止任务。即使任务需要几天才能完成,工人也不会完全关闭,直到它完成。您可以将--soft-time-limit--time-limit设置为,以告知示例何时可以终止任务。
因此,要添加任何类型的进程清理进程,您首先需要确保任务可以实际完成。因为在那之前不会有大扫除。
要向池工作进程添加清理步骤,可以使用以下命令:

  1. from celery import platforms
  2. from celery.signals import worker_process_init
  3. def cleanup_after_tasks(signum, frame):
  4. # reentrant code here (see http://docs.python.org/library/signal.html)
  5. def install_pool_process_sighandlers(**kwargs):
  6. platforms.signals["TERM"] = cleanup_after_tasks
  7. platforms.signals["INT"] = cleanup_after_tasks
  8. worker_process_init.connect(install_pool_process_sighandlers)
展开查看全部
pkbketx9

pkbketx92#

使用worker_shuttling_down信号。参见https://docs.celeryq.dev/en/stable/userguide/signals.html#worker-shutting-down和https://stackoverflow.com/a/55481656/237091以获取示例。

相关问题