Celery在docker中运行时无法将任务发送到队列

xu3bshqb  于 2023-08-03  发布在  Docker
关注(0)|答案(2)|浏览(145)

我在Windows上测试了它,它工作正常,但现在我想用Docker来做。问题是当我尝试执行任务发送电子邮件给用户我得到错误:[Errno 111] Connection refused,但celery成功启动并连接到rabbitmq。为什么celery不能给rabbitmq发任务?

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 32, in __call__
    return self.__value__
           ^^^^^^^^^^^^^^

During handling of the above exception ('ChannelPromise' object has no attribute '__value__'), another exception occurred:
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 472, in _reraise_as_library_errors
    yield
    ^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 459, in _ensure_connection
    return retry_over_time(
           
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 318, in retry_over_time
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 941, in _connection_factory
    self._connection = self._establish_connection()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 867, in _establish_connection
    conn = self.transport.establish_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/transport/pyamqp.py", line 203, in establish_connection
    conn.connect()
    ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/connection.py", line 323, in connect
    self.transport.connect()
    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 129, in connect
    self._connect(self.host, self.port, self.connect_timeout)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 184, in _connect
    self.sock.connect(sa)
    ^^^^^^^^^^^^^^^^^^^^^

The above exception ([Errno 111] Connection refused) was the direct cause of the following exception:
  File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/exception.py", line 55, in inner
    response = get_response(request)
               ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/website/journal_website/views.py", line 281, in register_new_user
    send_email_message_to_user_with_activation_link.delay(new_user.pk, new_user_additional_data.code)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 594, in apply_async
    return app.send_task(
           
  File "/usr/local/lib/python3.11/dist-packages/celery/app/base.py", line 798, in send_task
    amqp.send_task_message(P, name, message, **options)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/amqp.py", line 517, in send_task_message
    ret = producer.publish(
          
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 186, in publish
    return _publish(
           
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 563, in _ensured
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 195, in _publish
    channel = self.channel
              ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 218, in _get_channel
    channel = self._channel = channel()
                              ^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
                             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 234, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 960, in default_channel
    self._ensure_connection(**conn_opts)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 458, in _ensure_connection
    with ctx():
    ^^^^^
  File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

字符串
docker-compose.yml:

version: "3.0"

services:
  # WEB
  django:
    build: .
    command: python3.11 manage.py runserver 0.0.0.0:8000
    container_name: django-server
    volumes:
      - media_volume:/website/journal_website/media
      - static_volume:/website/journal_website/static
      - database_volume:/website/journal_website/database
    ports:
      - "8000:8000"
    depends_on:
      - rabbit

  # Celery
  celery:
    build: .
    command: celery -A website worker -l info
    container_name: celery
    depends_on:
      - rabbit
  
  # RabbitMQ
  rabbit:
    hostname: rabbit
    container_name: rabbitmq
    image: rabbitmq:3.12-rc-management
    ports:
      # AMQP protocol port
      - "5672:5672"
      # HTTP management UI
      - "15672:15672"
    restart: always
      

volumes:
  media_volume:
  static_volume:
  database_volume:


网址:celery.py

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "website.settings")
celery_application = Celery("website")
celery_application.config_from_object("django.conf:settings", namespace="CELERY")
celery_application.conf.broker_url = "amqp://rabbit:5672"
celery_application.autodiscover_tasks()


网址:tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task

# Some imports...

@shared_task
def send_email_message_to_user_with_activation_link(target_user_id: int, code: UUID) -> HttpResponse | None:
    target_user = User.objects.get(pk=target_user_id)
    content = {
        "email": target_user.email,
        "domain": "127.0.0.1:8000",
        "site_name": "Website",
        "user": target_user,
        "protocol": "http",
        "code": code,
    }

    message = render_to_string("user/account_activation/account_activation_email.txt", content)
    try:
        send_mail("Account activation", message, "admin@example.com" , [target_user.email], fail_silently=False)
    except BadHeaderError:
        return HttpResponse("Invalid header found.")

dluptydi

dluptydi1#

我试过你的docker-compose,它对我很好。唯一的问题是rabbitmq需要大约20秒的时间来启动和接受连接。您可能会看到连接超时错误,直到它启动。但一旦启动,celery 工人建立连接,工作正常。
对于开发,最好不要停止rabbitmq容器,并在进行更改时重新启动celery容器。但是在rabbitmq启动时,您可能总是会遇到超时。

ykejflvf

ykejflvf2#

解决后,它可以在带有WSL 2的Linux上工作。首先,我需要在项目目录中的__init__.py文件中的一些代码行:

from .celery import celery_application as celery_app
__all__ = ["celery_app"]

字符串
然后我在docker-compose文件中为django和celery容器使用了公共数据库卷:

# Celery worker
  celery_worker:
    command: celery -A website worker -l info
    container_name: celery_worker
    volumes:
      - database_volume:/website/journal_website/database # just add this to celery worker service.
    image: django-image # you need to use common image for django and celery containers that is created from Dockerfile.
    depends_on:
      - rabbitmq
      - django

相关问题