我想要一个有10个分区的主题。我使用的是Kafka的默认配置。我用那个助手脚本创建了一个包含10个分区的主题,现在我要向它生成消息。
问题是,似乎只有5个分区,消费者从中获取数据。
让我们更详细地描述一下。
我知道每个分区需要一个用户线程的常见情况。我希望能够提交每个分区的偏移量,这只有在每个分区有一个线程/使用者连接器时才可能(我使用的是高级使用者)。
因此,我创建了10个线程,在每个线程中调用consumer.createJavaConsumerConnectionr(),我在这里这样做
topicCountMap.put("mytopic", 1);
最后我有一个迭代器,它使用来自一个分区的消息。
当我这样做10次时,我有10个使用者,每个分区的每个线程的使用者,在这里我可以独立地提交每个分区的偏移量,因为如果我在主题Map中输入不同于1的数字,我最终会得到该主题的多个使用者线程,所以如果我要用创建的使用者示例提交偏移量,它将为所有不需要的线程提交它们,从而为多个不需要的分区提交它们。
但问题是,当我使用消费者,只有5个消费者参与,似乎其他线程是闲置的,但我不知道为什么。
第一个可能的原因是,即使我有10个分区,也只有5个分区有消息,因此其他5个使用者处于空闲状态,但我不明白,当我使用producer时,消息如何不均匀地分布到所有分区。我发送了大约一百万条信息,所以如果说它们是均匀分布的,那么每个分区都必须至少包含一些信息。
//编辑
我设法在一个主题中创建了10个分区,但我只有7个使用者。这对我来说只是个奇迹。
问题是我正在循环中创建这些消费线程。因此,我首先启动一个线程(提交给执行器服务),然后启动另一个线程,然后启动另一个线程,依此类推。
因此,场景是第一个使用者获得所有10个分区,然后第二个连接,所以它在这两个分区之间被拆分为5和5(或类似的东西),然后其他线程正在连接。
我将其理解为所有使用者之间的分区重新平衡,因此它在这样的意义上表现得很好:如果创建了更多的使用者,则这些使用者之间会发生分区平衡,因此每个使用者都应该有一些分区来操作。
但是从结果中我看到只有7个消费者,根据消费的消息,它们似乎是按3,2,1,1,1,1分区划分的。是的,这7个消费者覆盖了所有10个分区,但是为什么拥有超过1个分区的消费者不进行分割,而将分区分配给其余3个消费者呢?
我很想知道剩下的3个线程发生了什么,为什么它们不从分配了超过1个分区的用户那里“获取”分区。
1条答案
按热度按时间pobjuy321#
在通过管理脚本创建主题之前,当我(意外地)以编程方式访问主题时,我见过类似的行为。在这种情况下,分区数以及其他主题配置设置默认为broker.config中的值