为什么消费者在重新启动后阅读kafka主题中的所有消息?

ej83mcc0  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(444)

我使用confluent.net客户端。订阅者总是在重新启动(订阅者服务重新启动)后读取来自kafka主题的所有消息。如何提交消费者已经获得的补偿并从中读取?也许一些消费者配置可以帮助。。。

wvmv3b1j

wvmv3b1j1#

这只是猜测,但如何声明消费者的组id?我见过一些使用这种随机分配的例子:

["group.id"] = Guid.NewGuid().ToString(),

如果你宣布一个新的/随机的 group.id 每次启动消费者时,都会在每次执行时注册一个新的消费者组,这涉及 auto.offset.reset 踢进来。
如果此属性设置为“ earliest “,然后每次启动消费者时(假设他们每次都有不同的group.id),他们将从第一个可用的偏移量开始,就像在您的情况中一样,从头开始重新读取所有消息。
如果此属性设置为“ latest ,并且您的制作者当前没有发送任何消息,您将无法读取任何内容,这可能会造成一些混乱。
尝试设置一个固定的 group.id :开始消费,在代理上仍有消息可用时停止进程,然后再次启动消费程序,而不更改最后一个 group.id .
这一次,由于消费者群体已经注册, auto.offset.reset 将被忽略,并且起始位置将由提交的偏移量定义,默认情况下,这些偏移量存储在名为 __consumer_offsets .

pprl5pva

pprl5pva2#

您有两种选择:
通过设置使用者属性启用自动提交 EnableAutoCommit = true (消息在可配置的时间(通常为5秒)后提交),或
手动提交获取的偏移量 consumer.Commit(consumeResult) .
github上显示了手动提交的示例。

相关问题