如何处理消息队列中出现的无序消息?

exdqitrt  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(654)

有一次我在一次采访中被问到,你会如何处理消息队列中无序出现的消息。已经有一段时间了,我还没有找到一个明确的答案,我想知道是否该领域的Maven可以帮助我回答这个问题,以满足我自己的好奇心。
我知道有些消息队列只提供一次和fifo保证。我也知道流媒体系统中事件时间和处理时间的概念。例如,在基于日志的消息队列(如kafka)中,由于偏移量和消息持久性的存在,混合排序的可能性可能较小(我可能错了)。我还考虑过使用时间戳,要求每个消息发送者在发送消息之前记录消息的时间,但由于时钟偏移,这充满了不一致性。
考虑到所有这些,我想知道,在传统的消息传递系统(如amqp、jms或rabbitmq)中,一个地址如何混合排序,其中十几个物联网设备可能正在发送消息,而我作为消费者希望以正确的顺序协调它们。

of1yzvn4

of1yzvn41#

我可以回答关于Apache·Kafka的问题。apachekafka通过分区来保证主题的严格顺序,这意味着每个分区都是以严格顺序附加的不可变的消息序列。因此,在这种情况下,多个分区使用者可能会使用来自多个分区的消息,这些分区的顺序不严格。我们可以考虑以下两个选项,以实现严格的秩序。
如果按顺序查找一个生产者消息,则每个主题只能使用一个分区。因此,生产者将在同一个分区上按顺序发布,消费者将严格按照顺序消费。

生产者将消息发布到多个分区,因此在使用者组中使用多使用者,但使用“为每个使用者分配到特定分区”来使用来自特定分区的消息将保证每个使用者对每个分区有严格的顺序

p4tfgftt

p4tfgftt2#

如果您的系统正在使用队列,提供有序的消息保证,那么只需使用该通道(比如kakfa的单分区,在某些设置下是amqp)。但如果您的系统使用的队列没有提供严格的排序,那么一般的想法是,客户机可以在它发送到队列的每个消息上附加单调递增的[1]个数字(或时间戳)。这构成了制作者打算发送给接收者的序列的基础。
如何获得月度增值:
使用时间戳:带有clock\u monotonic[2]的posix clock\u gettime()函数提供了获取单调递增时间戳的选项,生产者可以使用该选项为每条消息添加时间戳。当接收器看到接收到的消息的时间戳比最新消息的时间戳早时,它可以识别出无序消息。
使用序列号:在发送每条消息之前,您可以简单地增加一个原子计数器,并将计数器值附加到每条消息上,这样接收方就可以知道预期的顺序。这将形成严格的递增序列。这种方法非常类似于lamport的逻辑时钟[3],它为生产者提供虚拟时钟。
在接收方处理无序消息:这在很大程度上是特定于应用程序的,但一般来说,当消息无序到达时,您有两个选项:a)丢弃旧消息,例如在接收方必须显示股票最新价值的情况下。b) 有缓冲区来重新排序,比如在tcp连接中(例如zookeeper使用tcp作为fifo排序的队列[4-5])
工具:如果您没有向消息添加时间戳,那么将所有消息从producer按顺序发送到apachekafka单个分区,因为这将确保接收方可以按顺序接收消息。
如果您使用的消息传递系统不能保证有序传递(如某些设置下的amqp[6]),那么您可以考虑在每条消息中添加额外的单调递增的数字/时钟。
[1] https://en.wiktionary.org/wiki/monotonic_increasing#targettext=形容词,对比%20this%20with%20trictly%20increasing
[2] https://linux.die.net/man/2/clock_gettime
[3] https://en.wikipedia.org/wiki/lamport_timestamps#lamport分布式系统中的逻辑时钟
[4] https://cwiki.apache.org/confluence/download/attachments/24193445/zookeeper-internals.pdf?version=1&modificationdate=1295034038000&api=v2
[5] http://www.tcs.hut.fi/studies/t-79.5001/reports/2012-desouzamedeiros.pdf
[6] rabbitmq-消息传递顺序

相关问题