我需要创建一个消费者,它能够从多个主题中提取消息并根据时间戳(kafka消息时间戳)订购消息
有点像这样:
(对不起,画得不好……)
在本例中,我订阅了“topica”和“topicb”,并按时间戳的顺序对消息进行排队
现在,只要所有主题只有一个分区,就可以用以下伪代码轻松解决:
kafka.subscribe(['topicA', 'topicB'])
messagesByTopic = {}
finalMessageQueue = []
while true:
records = kafka.poll()
for record in records:
messagesByTopic[record.topic()].enqueue(record)
while messagesByTopic.any(queue => !queue.notEmpty()):
minQueue = messagesByTopic.min(queue => queue.peek().timestamp)
finalMessageQueue.enqueue(minQueue.pop())
当我为每个主题引入多个分区时,问题就出现了。显然,不可能将多个主题按时间排序到一个流中,因为在一个主题中,顺序不能保证,只能在一个分区中,所以新的问题是将多个主题排序到具有相同密钥的流中。
设想2个主题,order和draw主题中消息的键是事务所属的客户id。
目标是将所有主题流到队列中(每个客户一个),按时间戳排序。
理论上应该是可能的,因为order和retrach主题中的消息是按每个客户的时间戳排序的,实际上,当处理每个主题的单个分区时,这个问题很容易解决。
现在,考虑2个orders分区和1个drawing分区的情况,如果有两个进程同时运行会发生什么?一个流程将撤回所有客户,但只有一半客户的订单,第二个流程将只有一半客户的订单,它将分离。
唯一的办法就是告诉Kafka,确保相同的密钥(甚至来自不同的主题)总是被路由到同一个进程,但据我所知,没有办法做到这一点。
我卡住了。我需要一个办法。
1条答案
按热度按时间s2j5cfk01#
为了达到预期的效果,您应该确保两个主题之间的分区具有一定的对应性,方法是更改消息生产者对消息的分区方式,或者在排序逻辑之前将原始主题中的数据重新分区为新的中间主题。理想情况下,两个主题的分区之间会有1:1的对应关系。一般来说,您的并行性(线程数)是由两个主题的分区计数之间的最高公因数限定的,例如,如果orders主题有12个分区,而drawing主题有9个分区,那么您可以将分区分配给hcf(12,9)=3个线程,如下所示:线程1:orders partitions(0,1,2,3),drawing partitions(0,1,2) 线程2:orders partitions(4,5,6,7),drawing partitions(3,4,5)线程3:orders partitions(8,9,10,11),drawing partitions(6,7,8)要使其工作,您需要为这两个主题实现自定义分区,而不是默认分区。
但是,如果一个主题有1个分区,而另一个有2个分区,那么hcf(1,2)是1,这意味着您只能通过单线程的方式来完成。