Kafka主题设计最佳方法

q43xntqr  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(293)

我们正在设计一个使用apachekafka发送关键业务数据的集成。我们有一个生产者和5个消费者,所以我创建了一个主题和5个分区,为每个消费者分配一个分区,但是我们需要以生产者发送的相同顺序传递信息,我们无法实现。我读到,我只能实现按分区排序,所以如果我只有一个分区,我应该可以做到这一点,但由于我有5个消费者,我需要分区并行主题。因此,我认为我必须使用主题键,但由于顺序仅由分区决定,因此我有一些问题:如果我在kafka producer中使用键,我应该发送指定分区号的有效负载(即在producer代码中写入消息5次,每个分区一次)?,或者仅通过向主题发送带有键的数据,Kafka在每个分区?中以相同的顺序复制和写入数据?。例子: for(int i=0;i<=partitionsnumber;i++){ sendtoKafka(i,key,payload); } 在这种情况下,我应该为每个使用者使用一个主题而不是分区吗?
以相同的顺序向所有消费者发送数据的最佳策略是什么?
注意:消息中唯一的键是string类型。

lnlaulya

lnlaulya1#

你需要所有的消费者都阅读制作人发布的相同消息,对吗?
如果是这样的话,就不必向主题的所有5个分区发布/生成相同的消息。
一个更简单的方法是创建一个具有1个分区的主题,并且您的producer应用程序将把所有消息发布到该主题/分区。
现在,您可以轻松地创建使用来自同一主题的数据的不同使用者组的使用者应用程序。给你的消费者分配一些随机的id,这样你就可以使用一个主题/分区和所有5个消费者,并且可以提交偏移量。
只需将下面的代码片段添加到所有5个消费者应用程序属性中。

props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); // randomise consumer group.

如果你有任何问题,请告诉我。

xbp102n0

xbp102n02#

我无法添加评论,因为它很长。
您在评论中提到的“我们需要相等数量的分区用于消费应用程序”是正确的。但是,它仅适用于所有消费者(在您的例子中是its5)都属于同一个消费者组的情况。
例如,一个主题t有5个分区,现在假设我们创建一个消费者c1和消费者组g1。使用者c1将从主题t的所有5个分区获得消息。然后,我们在同一消费者组g1下添加消费者c2。c1将消耗3个分区,c2将消耗其余2个分区(反之亦然)。现在您所提到的“每个使用者应用程序一个分区”是一个理想的场景,在这种情况下,同一使用者组(g1)下的5个使用者可以并行地使用所有5个分区。这个概念叫做可伸缩性。
现在,在您的例子中,您需要读取5次相同的数据,因为您有5个消费者。在这种情况下,您可以编写一个简单的producer应用程序,发布一个分区为1的主题上的数据,而不是将相同的消息发布到5个分区,然后使用来自所有5个使用者的相同消息。然后,您的5个消费者应用程序可以独立地使用相同的数据,即我告诉您为所有消费者应用程序分配随机消费者组名称,以便它将独立地使用消息(以及提交偏移量)。
在代码段下面。来自同一主题(1个分区)的两个并行消费消息:
消费者1:

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); // randomise consumer group for consumer 1. 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

            KafkaConsumer consumerLiveVideo = new KafkaConsumer(props);
            consumerLiveVideo.subscribe(Collections.singletonList(topicName[0])); // topic with 1 partition

消费者2:

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); // randomise consumer group for consumer 2 . 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            KafkaConsumer consumerLiveVideo = new KafkaConsumer(props);
            consumerLiveVideo.subscribe(Collections.singletonList(topicName[0])); // topic with 1 partition

您还询问了正确的方法,根据我的说法,您只需要一个单一的消费者应用程序。另外,不要在kafka中混淆复制和可伸缩性的概念,因为这两个概念都非常关键。
另外,您已经提到了关键数据,您可以阅读关于producer配置参数acks(根据您的场景使用参数acks=1或acks=all)。
有关可伸缩性、复制、用户组、用户/生产者/代理/主题的更多详细信息,请参阅kafka最终指南的第1-5章。

相关问题