rabbitmq 兔MQ鼠兔:无法停止使用流队列,因为它不允许nack或拒绝

rqdpfwrv  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(219)

我的应用程序有一个Stream队列,我想让它在满足特定条件时停止消费。由于start_consuming永远被BlockingConnection阻塞,所以看起来唯一的出路是在回调中调用stop_consuming。
不幸的是,这不起作用,因为stop_consuming将拒绝所有待定消息(来自pika docs)
注意:挂起的不可确认消息将丢失;未决的可确认消息将被拒绝。
...您无法拒绝流队列,因为此错误消息确认:
operation basic.reject caused a connection exception not_implemented: "basic.nack and basic.reject not supported by stream queues queue 'stream-queue' in vhost '/'"
下面是一个简单的例子。服务器代码:


# !/usr/bin/env python

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()

channel_stream.queue_declare(
    "stream-queue",
    auto_delete=False, exclusive=False, durable=True,
    arguments={
        'x-queue-type': 'stream',
    }
)
channel_stream.basic_qos(
    prefetch_count=1,
)

class Server(object):
    def __init__(self):
        channel_stream.basic_consume(
            queue="stream-queue",
            on_message_callback=self.stream_callback,
        )

    def stream_callback(self, channel, method, props, body):
        print(f"received '{body.decode()}' via {method.routing_key}")
        channel_stream.stop_consuming()

server = Server()

try:
    channel_stream.start_consuming()
except KeyboardInterrupt:
    connection.close()

和客户端代码:


# !/usr/bin/env python

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()

channel_stream.queue_declare(
    "stream-queue",
    durable=True,
    arguments={
        'x-queue-type': 'stream',
    }
)

# This publishes twice: once to trigger the server's callback

# and again to get a message in the queue that the server is forced

# to nack, causing a crash.

for i in range(2):
    channel_stream.basic_publish(
        exchange='',
        routing_key='stream-queue',
        body=f"stream data".encode()
    )
connection.close()

而全输出:

~/anaconda3/envs/py310/bin/python ~/workspace/rabbitmq_train/stream_bug/server_stream_only.py 
received 'stream data' via stream-queue
Traceback (most recent call last):
  File "~/workspace/rabbitmq_train/stream_bug/server_stream_only.py", line 36, in <module>
    channel_stream.start_consuming()
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1880, in start_consuming
    self._process_data_events(time_limit=None)
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 2041, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 848, in process_data_events
    self._dispatch_channel_events()
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
    impl_channel._get_cookie()._dispatch_events()
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1507, in _dispatch_events
    consumer_info.on_message_callback(self, evt.method,
  File "~/workspace/rabbitmq_train/stream_bug/server_stream_only.py", line 30, in stream_callback
    channel_stream.stop_consuming()
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1893, in stop_consuming
    self._cancel_all_consumers()
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1494, in _cancel_all_consumers
    self.basic_cancel(consumer_tag)
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1802, in basic_cancel
    self._flush_output(
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 1350, in _flush_output
    self._connection._flush_output(lambda: self.is_closed, *waiters)
  File "~/anaconda3/envs/py310/lib/python3.10/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.ConnectionClosedByBroker: (540, "NOT_IMPLEMENTED - basic.nack and basic.reject not supported by stream queues queue 'stream-queue' in vhost '/'")

Process finished with exit code 1

这是我启动服务器的方式:

docker run -it --rm -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
watbbzwu

watbbzwu1#

你不能对不同类型的队列使用相同的channel。一般来说,这不是最佳实践。你应该为每个消费者提供一个通道。
若要解决您的问题:


# !/usr/bin/env python

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channelStream = connection.channel()

channelStream.queue_declare(
    "stream-queue",
    auto_delete=False, exclusive=False, durable=True,
    arguments={
        'x-queue-type': 'stream',
    }
)
channel.queue_declare(
    "normal-queue"
)
channelStream.basic_qos(
    prefetch_count=1,
)

class Server(object):
    def __init__(self):
        channelStream.basic_consume(
            queue="stream-queue",
            on_message_callback=self.stream_callback,
        )
        channel.basic_consume(
            queue="normal-queue",
            on_message_callback=self.normal_callback,
        )

    def stream_callback(self, channel, method, props, body):
        print(f"received '{body}' via {method.routing_key}")
        channelStream.basic_ack(delivery_tag=method.delivery_tag)
        channelStream.stop_consuming()

    def normal_callback(self, channel, method, props, body):
        print(f"received '{body}' via {method.routing_key}")
        channel.basic_ack(delivery_tag=method.delivery_tag)
        channel.stop_consuming()

server = Server()

try:
    channel.start_consuming()
    channelStream.start_consuming()
except KeyboardInterrupt:
    connection.close()

多线程将是:


# !/usr/bin/env python

import threading

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

connectionStream = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channelStream = connectionStream.channel()

channelStream.queue_declare(
    "stream-queue",
    auto_delete=False, exclusive=False, durable=True,
    arguments={
        'x-queue-type': 'stream',
    }
)
channel.queue_declare(
    "normal-queue"
)
channelStream.basic_qos(
    prefetch_count=1,
)

class Server(object):
    def __init__(self):
        pass

    def stream_callback(self, channel, method, props, body):
        print(f"stream_callback '{body}' via {method.routing_key}")
        channelStream.basic_ack(delivery_tag=method.delivery_tag)
        channelStream.stop_consuming()

    def normal_callback(self, channel, method, props, body):
        print(f"normal_callback '{body}' via {method.routing_key}")
        channel.basic_ack(delivery_tag=method.delivery_tag)
        channel.stop_consuming()

    def start_consume_normal(self):
        print(f"start_consume_normal ")
        channel.basic_consume(
            queue="normal-queue",
            on_message_callback=self.normal_callback,
        )
        channel.start_consuming()

    def start_consume_stream(self):
        channelStream.basic_consume(
            queue="stream-queue",
            on_message_callback=self.stream_callback,
        )
        print(f"start_consume_stream ")
        channelStream.start_consuming()

server = Server()

try:
    x = threading.Thread(target=server.start_consume_normal, args=())
    x.start()
    print(f"start_consume_normal")
    x2 = threading.Thread(target=server.start_consume_stream, args=())
    x2.start()
    print(f"start_consume_stream")
    input()
except KeyboardInterrupt:
    connection.close()

Note about multi-thread

相关问题