rabbitmq Django Pytest给Kombu连接被拒绝[111]使用兔子和celery

h79rfbju  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(336)

目前我正在使用pytest、django和rabbitmq-server with celery进行后台任务,但是我注意到,当我在本地机器上运行pytest而不运行服务器时,会导致一个很长的异常线程,特别是

E:Kombu连接被拒绝[111]

我正在运行保存()模型方法的测试,该方法会触发一个后台celery任务
我想问一下,是否有一种方法可以在pytest中启动rabbitmq-server,而无需在本地机器上实际启动它,这样我的测试用例就可以通过
这也影响了我的github CI/CD,因为我不知道如何运行github-actions.yml文件中的rabbitmq图像服务
谢谢你的帮助
如果您愿意,可以仔细阅读下面的日志

self = <promise: 0x7fd0819d9d80>

    def __call__(self):
        try:
>           return self.__value__
E           AttributeError: 'ChannelPromise' object has no attribute '__value__'

../../../.local/lib/python3.10/site-packages/kombu/utils/functional.py:30: AttributeError

During handling of the above exception, another exception occurred:

self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>
ConnectionError = <class 'kombu.exceptions.OperationalError'>
ChannelError = <class 'kombu.exceptions.OperationalError'>

    @contextmanager
    def _reraise_as_library_errors(
            self,
            ConnectionError=exceptions.OperationalError,
            ChannelError=exceptions.OperationalError):
        try:
>           yield

../../../.local/lib/python3.10/site-packages/kombu/connection.py:446: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>
errback = None, max_retries = None, interval_start = 2, interval_step = 2
interval_max = 30, callback = None, reraise_as_library_errors = True
timeout = 4

    def _ensure_connection(
        self, errback=None, max_retries=None,
        interval_start=2, interval_step=2, interval_max=30,
        callback=None, reraise_as_library_errors=True,
        timeout=None
    ):
        """Ensure we have a connection to the server.

        If not retry establishing the connection with the settings
        specified.

        Arguments:
            errback (Callable): Optional callback called each time the
                connection can't be established.  Arguments provided are
                the exception raised and the interval that will be
                slept ``(exc, interval)``.

            max_retries (int): Maximum number of times to retry.
                If this limit is exceeded the connection error
                will be re-raised.

            interval_start (float): The number of seconds we start
                sleeping for.
            interval_step (float): How many seconds added to the interval
                for each retry.
            interval_max (float): Maximum number of seconds to sleep between
                each retry.
            callback (Callable): Optional callback that is called for every
                internal iteration (1 s).
            timeout (int): Maximum amount of time in seconds to spend
                waiting for connection
        """
        if self.connected:
            return self._connection

        def on_error(exc, intervals, retries, interval=0):
            round = self.completes_cycle(retries)
            if round:
                interval = next(intervals)
            if errback:
                errback(exc, interval)
            self.maybe_switch_next()  # select next host

            return interval if round else 0

        ctx = self._reraise_as_library_errors
        if not reraise_as_library_errors:
            ctx = self._dummy_context
        with ctx():
>           return retry_over_time(
                self._connection_factory, self.recoverable_connection_errors,
                (), {}, on_error, max_retries,
                interval_start, interval_step, interval_max,
                callback, timeout=timeout
            )

../../../.local/lib/python3.10/site-packages/kombu/connection.py:433: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fun = <bound method Connection._connection_factory of <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>>
catch = (<class 'amqp.exceptions.RecoverableConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
args = [], kwargs = {}
errback = <function Connection._ensure_connection.<locals>.on_error at 0x7fd081f0c790>
max_retries = None, interval_start = 2, interval_step = 2, interval_max = 30
callback = None, timeout = 4

    def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
                        max_retries=None, interval_start=2, interval_step=2,
                        interval_max=30, callback=None, timeout=None):
        """Retry the function over and over until max retries is exceeded.

        For each retry we sleep a for a while before we try again, this interval
        is increased for every retry until the max seconds is reached.

        Arguments:
            fun (Callable): The function to try
            catch (Tuple[BaseException]): Exceptions to catch, can be either
                tuple or a single exception class.

        Keyword Arguments:
            args (Tuple): Positional arguments passed on to the function.
            kwargs (Dict): Keyword arguments passed on to the function.
            errback (Callable): Callback for when an exception in ``catch``
                is raised.  The callback must take three arguments:
                ``exc``, ``interval_range`` and ``retries``, where ``exc``
                is the exception instance, ``interval_range`` is an iterator
                which return the time in seconds to sleep next, and ``retries``
                is the number of previous retries.
            max_retries (int): Maximum number of retries before we give up.
                If neither of this and timeout is set, we will retry forever.
                If one of this and timeout is reached, stop.
            interval_start (float): How long (in seconds) we start sleeping
                between retries.
            interval_step (float): By how much the interval is increased for
                each retry.
            interval_max (float): Maximum number of seconds to sleep
                between retries.
            timeout (int): Maximum seconds waiting before we give up.
        """
        kwargs = {} if not kwargs else kwargs
        args = [] if not args else args
        interval_range = fxrange(interval_start,
                                 interval_max + interval_start,
                                 interval_step, repeatlast=True)
        end = time() + timeout if timeout else None
        for retries in count():
            try:
>               return fun(*args,**kwargs)

../../../.local/lib/python3.10/site-packages/kombu/utils/functional.py:312: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>

    def _connection_factory(self):
        self.declared_entities.clear()
        self._default_channel = None
>       self._connection = self._establish_connection()

../../../.local/lib/python3.10/site-packages/kombu/connection.py:877: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Connection: amqp://guest:**@127.0.0.1:5672// at 0x7fd082053010>

    def _establish_connection(self):
        self._debug('establishing connection...')
>       conn = self.transport.establish_connection()

../../../.local/lib/python3.10/site-packages/kombu/connection.py:812: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <kombu.transport.pyamqp.Transport object at 0x7fd081b67490>

    def establish_connection(self):
        """Establish connection to the AMQP broker."""
        conninfo = self.client
        for name, default_value in self.default_connection_params.items():
            if not getattr(conninfo, name, None):
                setattr(conninfo, name, default_value)
        if conninfo.hostname == 'localhost':
            conninfo.hostname = '127.0.0.1'
        # when server_hostname is None, use hostname from URI.
        if isinstance(conninfo.ssl, dict) and \
                'server_hostname' in conninfo.ssl and \
                conninfo.ssl['server_hostname'] is None:
            conninfo.ssl['server_hostname'] = conninfo.hostname
        opts = dict({
            'host': conninfo.host,
            'userid': conninfo.userid,
            'password': conninfo.password,
            'login_method': conninfo.login_method,
            'virtual_host': conninfo.virtual_host,
            'insist': conninfo.insist,
            'ssl': conninfo.ssl,
            'connect_timeout': conninfo.connect_timeout,
            'heartbeat': conninfo.heartbeat,
        },**conninfo.transport_options or {})
        conn = self.Connection(**opts)
        conn.client = self.client
>       conn.connect()

../../../.local/lib/python3.10/site-packages/kombu/transport/pyamqp.py:201: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <AMQP Connection: 127.0.0.1:5672// (disconnected) at 0x7fd081b2ace0>
callback = None

    def connect(self, callback=None):
        # Let the transport.py module setup the actual
        # socket connection to the broker.
        #
        if self.connected:
            return callback() if callback else None
        try:
            self.transport = self.Transport(
                self.host, self.connect_timeout, self.ssl,
                self.read_timeout, self.write_timeout,
                socket_settings=self.socket_settings,
            )
>           self.transport.connect()

../../../.local/lib/python3.10/site-packages/amqp/connection.py:323: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TCPTransport: (disconnected) at 0x7fd081c8fac0>

    def connect(self):
        try:
            # are we already connected?
            if self.connected:
                return
>           self._connect(self.host, self.port, self.connect_timeout)

../../../.local/lib/python3.10/site-packages/amqp/transport.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TCPTransport: (disconnected) at 0x7fd081c8fac0>, host = '127.0.0.1'
port = 5672, timeout = 4

    def _connect(self, host, port, timeout):
        entries = socket.getaddrinfo(
            host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, SOL_TCP,
        )
        for i, res in enumerate(entries):
            af, socktype, proto, canonname, sa = res
            try:
                self.sock = socket.socket(af, socktype, proto)
                try:
                    set_cloexec(self.sock, True)
                except NotImplementedError:
                    pass
                self.sock.settimeout(timeout)
>               self.sock.connect(sa)
E               ConnectionRefusedError: [Errno 111] Connection refused

../../../.local/lib/python3.10/site-packages/amqp/transport.py:184: ConnectionRefusedError

The above exception was the direct cause of the following exception:

api_client = <utils.tenancy_api_client.TenantAPIClient object at 0x7fd081f1fdc0>
stock_json = {'collect_on': '2022-03-07', 'owner': {'address': 'Los Angeles', 'email': 'owner@email.com', 'first_name': 'Lukas', 'i...mZQknaVNvcXKiEtmlWxxTWGjpBzZbrCSDkkKhkhlIoLZeputqKZNSdfbKlQTJzWBAgfZIoHiOTTusdXAQoCYHVXiQIqrhUpIvMNFhYvHUWRcOxJff'}]}]}
celery_config = {'broker_url': 'amqp://'}, celery_worker_parameters = {}

    def test_stock_created_with_valid_data(api_client, stock_json, celery_config, celery_worker_parameters):
        """ tests stock is created with valid json """
>       res = api_client.post(reverse_lazy("core_api:stocks-list"), stock_json, format="json")
6pp0gazn

6pp0gazn1#

好的,我已经修复了,因为rabbitmq-server是一个必须在其上运行的服务器,就像postgresql(基于服务器的数据库)必须在pytest启动之前运行一样
对于带有github操作的CI方面,问题是我没有为rabbitmq映像使用动态端口,所以我更改了
运行:ubuntu-latest
服务项目:rabbitmq:图片:兔子mq:3.8环境:RABBITMQ默认用户:访客RABBITMQ默认通过:访客端口:小行星5672

runs-on: ubuntu-latest

    services:
      rabbitmq:
        image: rabbitmq:3.8
        env:
          RABBITMQ_DEFAULT_USER: guest
          RABBITMQ_DEFAULT_PASS: guest
        ports: ["5762:5762"]

通过完美的github现在和没有更多的科姆连接拒绝:)

相关问题