我用分区和复制因子创建了Kafka主题
然后我开始和我的制作人一起制作信息
我开始两个消费者的帮助下,消费者群体并行。
当我停止一个消费者时,另一个消费者开始工作。我怀疑我下载Kafka时的默认配置误导了我。我希望我的所有消费者都需要并发地接收消息。
mccptt671#
一个组中的两个消费者不能同时读取同一个分区。您将需要验证记录是否同时写入两个分区,而不是一起进行批处理,这是控制台生产者对小记录批处理的默认行为使用GetOffsetShell工具检查每个分区有多少偏移量(大约记录数)。您也可以使用kafka-consumer-groups命令来查看组的滞后。如果您看到所有分区都有零延迟,那么您已经使用了所有消费者的主题中的所有记录(请注意,一个分区可能具有相同值的开始和结束偏移量,GetOffsetShell也会显示,这意味着该分区为空,从而解释了为什么只有一个消费者打印任何数据)。
kafka-consumer-groups
j0pj023g2#
Kafka的默认批量大小为16384(字节),一旦我们开始产生足够的消息来填充该缓冲区大小,它就会改变分区。从kafka2.4开始,默认分区已从循环改为sticky所以我像这样重写了这个属性。C:\kafka\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --batch-size=1 --topic test32我们还可以通过指定如下的分区ID来向特定的分区生成消息:C:\kafka\bin\windows>kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test32 -property "parse.key=true" -property "key.separator=:"0:test1:test2其中0和1是分区id,test、test2是值。
C:\kafka\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --batch-size=1 --topic test32
C:\kafka\bin\windows>kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test32 -property "parse.key=true" -property "key.separator=:"
0:test
1:test2
0
1
test
test2
2条答案
按热度按时间mccptt671#
一个组中的两个消费者不能同时读取同一个分区。您将需要验证记录是否同时写入两个分区,而不是一起进行批处理,这是控制台生产者对小记录批处理的默认行为
使用GetOffsetShell工具检查每个分区有多少偏移量(大约记录数)。您也可以使用
kafka-consumer-groups
命令来查看组的滞后。如果您看到所有分区都有零延迟,那么您已经使用了所有消费者的主题中的所有记录(请注意,一个分区可能具有相同值的开始和结束偏移量,GetOffsetShell也会显示,这意味着该分区为空,从而解释了为什么只有一个消费者打印任何数据)。j0pj023g2#
Kafka的默认批量大小为16384(字节),一旦我们开始产生足够的消息来填充该缓冲区大小,它就会改变分区。
从kafka2.4开始,默认分区已从循环改为sticky
所以我像这样重写了这个属性。
C:\kafka\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --batch-size=1 --topic test32
我们还可以通过指定如下的分区ID来向特定的分区生成消息:
C:\kafka\bin\windows>kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test32 -property "parse.key=true" -property "key.separator=:"
0:test
1:test2
其中
0
和1
是分区id,test
、test2
是值。