在服务总线的死信队列中大约有9000条消息。我的要求是,在peek模式下,我应该只阅读插入队列中的最新消息。但是,在azure文档中,没有参数来读取最新的消息,而是从第一条消息开始。
那么,有没有一种方法可以获得最新的消息,而不从DLQ中删除它?
这是当前的代码:
servicebus_client = ServiceBusClient.from_connection_string(conn_str=cs,
# logging_enable=True,
retry_total=5, retry_backoff_factor=5, retry_mode="fixed",
transport_type=TransportType.AmqpOverWebsocket
)
dlq_receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME,
sub_queue=ServiceBusSubQueue.DEAD_LETTER,
receive_mode=ServiceBusReceiveMode.PEEK_LOCK)
with dlq_receiver:
while True:
received_msgs = dlq_receiver.receive_messages(max_message_count=1, max_wait_time=5)
for msg in received_msgs:
try:
decoded_value = msg.body.decode('utf-8')
except UnicodeDecodeError:
decoded_value = gzip.decompress(msg.body)
decoded_value = decoded_value.decode('utf-8')
print(decoded_value)
print("Body type: {}".format(msg.body_type))
print("Time to live: {}".format(msg.time_to_live))
print("Sequence number: {}".format(msg.sequence_number))
print("Enqueue Sequence number: {}".format(msg.enqueued_sequence_number))
print("Partition Key: {}".format(msg.partition_key))
print("Locked until: {}".format(msg.locked_until_utc))
print("Lock Token: {}".format(msg.lock_token))
print("Dead Letter Reason: {}".format(msg.dead_letter_reason))
print("Enqueued time: {}".format(msg.enqueued_time_utc))
对于输入,我可以注意到有些消息被重新读取(即,我看到的是相同的序列号)。如何实现我的用例?
1条答案
按热度按时间byqmnocz1#
Azure服务总线不会从队列末尾读取消息。它总是按照消息排队的顺序检索消息。如果你需要一种自定义的方式来访问你的数据,使用一个数据库或一个支持查询的服务(数据库,带有Blob索引的Blob,等等)。您可以将队列与索引合并结合起来,以便将消息保留在队列中,并能够在以后按顺序接收消息,但这种方法稍微复杂一些。