从RabbitMQ获取消息而不消耗

brjng4g3  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(3)|浏览(216)

我想获取Rabbitmq队列中存在的消息的副本,而不使用它们。有可能吗?提前感谢,

xpszyzbs

xpszyzbs1#

我想获取Rabbitmq队列中存在的消息的副本,而不使用它们。有可能吗?
最接近的选择是消费或获取消息,然后通过否定确认拒绝它。

cngwdvgl

cngwdvgl2#

也许你可以注册一个消费者(如官方文档中的here所示)* 而不给经纪人确认 *:no_ack=True

channel.basic_consume(callback, queue='hello', no_ack=True)

通过这种方式,您的消费者接收到消息内容,但消息本身不会被代理标记为 delivered(并在消费者退出时返回Ready状态)。
也许这不是最干净的方式来做你需要的,但它的工作,它的简单。
您可以采用的另一种(但类似)方法是基于所谓的 pull API(与注册 subscriber 时使用的 push API 相反);我在一个.Net应用程序中使用了这种方法:你可以在这里找到.Net文档,我认为Python API在这方面也是类似的。
关键的想法是在不给予ACK的情况下获得消息:channel.BasicGet(queueName, noAck)
我希望这可以帮助您走向全面和可靠的解决方案!

e0bqpujr

e0bqpujr3#

我发现有一种更好的方法可以通过使用channel.basic_get()函数来获取队列上的所有消息,如下面的代码所示:

def __init__(self):
        self.host = ConfigTools().get_attr('host')
        self.waiting_queue = ConfigTools().get_attr('test_queue_name')

    def view_queue(self) -> list:
        """Reads everything from the queue, then disconnects, causing the server to requeue the messages
        Returning the delivery tag is pointless at this point because the server makes the tag (an integer) up on
        the fly and the order can shuffle constantly"""
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host))

        msgs = []
        while True:
            chl = connection.channel()
            method_frame, header_frame, body = chl.basic_get(queue='test')
            if method_frame:
                print("body : ", body)
                msgs.append(body)
            else:
                print("No more messages returned")
                connection.close()
                break
        return msgs

然后,如果在任何时候我知道我想弹出队列中的哪条消息,我可以使用类似的东西:

def remove(self, item) -> list:
        """Removes the item from the queue. Goes through the entire queue, similar to view_queue, and acknowledges
        the msg in the list that matches, and returns the msg.
        If item matches more than one message on the queue, only one is deleted
        """

        if isinstance(item, list):
            if not (isinstance(i, bytes) for i in item):
                print("Item must be a list of only byte objects")
        if not isinstance(item, bytes):
            print("Item must be a singe bytes object")
            raise TypeError

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host))

        msgs = []
        while True:
            chl = connection.channel()
            method_frame, header_frame, body = chl.basic_get(queue='test')
            if method_frame:
                print('body: ', body)
                if body == item:
                    print("item found!")
                    msgs.append(body)
                    chl.basic_ack(method_frame.delivery_tag)
                    connection.close()
                    return msgs

            else:
                print("Message not found")
                connection.close()
                break
        return msgs
  • 注意:* 我将此用于一个小型应用程序-队列上的消息永远少于50条。我不能说这个函数在更大的应用程序中如何保持。

相关问题