我对Kafka(也对英语……)相当陌生,我面对这个问题,无法谷歌任何解决方案。
我使用spring-boot,spring-kafka支持,我已经在本地机器上安装了kafka\u2.11-0.10.1.1(只有一个代理0)
s1.然后我创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic tracking
我的使用者配置:applications.properties:
kafka.servers.bootstrap=localhost:9092
kafka.topic.tracking=tracking
kafka.group.id=trackingGroup
kafka.client.id=client-1
s2。然后我通过更改“kafka.client.id”并运行spring boot主类来启动3个消费者。在eclipse控制台上,我可以检查分区分配:
client-1: partitions assigned:[tracking-4, tracking-3]
client-2: partitions assigned:[tracking-2, tracking-1]
client-3: partitions assigned:[tracking-0]
第三。启动pruducer向主题发送20条消息,每条消息开始消耗特定分区的消息
s4。我关闭消耗1,Kafka自动重新平衡,新分区分配:
client-1: partitions assigned:[]
client-2: partitions assigned:[tracking-2,tracking-1, tracking-0]
client-3: partitions assigned:[tracking-4,tracking-3]
s5。我发现分区'tracking-3'上的消息没有被消耗!!
问题每次都可以被复制,在新分配的分区中丢失一些消息,你能有什么建议吗?请帮帮我,谢谢
1条答案
按热度按时间lyfkaqu11#
我复制了它;这看起来像是Kafka本身的问题
auto.comit.enabled=true
)关于重新平衡,Kafka正在报告未读分区的“位置”(the offset of the <i>next record</i> that will be fetched (if a record with that offset exists)
)作为分区的结尾。事实上,当我使用kafka消费者组工具时,未读分区的偏移量已经在“末尾”了。当我用一个消费者运行它时,当它读取第一个分区时,我看到。。。
请注意当前的\u offset列。
在下一次运行中,我运行了两次,一次是在处理第一个分区时,一次是在稍后运行。。。
和
看看分区2的当前偏移量是如何从44降到41的。
禁用自动提交为我解决了它。。。
...
这是我的测试程序:
具有属性
我在0.10.2.0中也看到了相同的结果。
编辑
原来是SpringKafka虫;它在启用自动提交的情况下工作,但是必须显式地启用它
否则容器会假定
false
并导致上述奇怪的行为-看起来客户端不喜欢在启用自动提交的情况下调用使用者的提交方法#288我通常会建议设置为false,并选择容器的
AckMode
而是s;例如。RECORD
记录下来之后,BATCH
在轮询收到的每个批之后(默认)。