在Azure服务总线中,如何读取死信队列中的最新消息?

vs3odd8k  于 2023-10-22  发布在  其他
关注(0)|答案(1)|浏览(141)

在服务总线的死信队列中大约有9000条消息。我的要求是,在peek模式下,我应该只阅读插入队列中的最新消息。但是,在azure文档中,没有参数来读取最新的消息,而是从第一条消息开始。
那么,有没有一种方法可以获得最新的消息,而不从DLQ中删除它?
这是当前的代码:

servicebus_client = ServiceBusClient.from_connection_string(conn_str=cs,
                                                            # logging_enable=True,
                                                            retry_total=5, retry_backoff_factor=5, retry_mode="fixed",
                                                            transport_type=TransportType.AmqpOverWebsocket
                                                            )

dlq_receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, 
                                                    sub_queue=ServiceBusSubQueue.DEAD_LETTER, 
                                                    receive_mode=ServiceBusReceiveMode.PEEK_LOCK)

with dlq_receiver:
        while True:
            received_msgs = dlq_receiver.receive_messages(max_message_count=1, max_wait_time=5)
            for msg in received_msgs:
                    try:
                        decoded_value = msg.body.decode('utf-8')
                    except UnicodeDecodeError:
                        decoded_value = gzip.decompress(msg.body)
                        decoded_value = decoded_value.decode('utf-8')
                        print(decoded_value)
                    print("Body type: {}".format(msg.body_type))
                    print("Time to live: {}".format(msg.time_to_live))
                    print("Sequence number: {}".format(msg.sequence_number))
                    print("Enqueue Sequence number: {}".format(msg.enqueued_sequence_number))
                    print("Partition Key: {}".format(msg.partition_key))
                    print("Locked until: {}".format(msg.locked_until_utc))
                    print("Lock Token: {}".format(msg.lock_token))
                    print("Dead Letter Reason: {}".format(msg.dead_letter_reason))
                    print("Enqueued time: {}".format(msg.enqueued_time_utc))

对于输入,我可以注意到有些消息被重新读取(即,我看到的是相同的序列号)。如何实现我的用例?

byqmnocz

byqmnocz1#

Azure服务总线不会从队列末尾读取消息。它总是按照消息排队的顺序检索消息。如果你需要一种自定义的方式来访问你的数据,使用一个数据库或一个支持查询的服务(数据库,带有Blob索引的Blob,等等)。您可以将队列与索引合并结合起来,以便将消息保留在队列中,并能够在以后按顺序接收消息,但这种方法稍微复杂一些。

相关问题