我使用confluent.net客户端。订阅者总是在重新启动(订阅者服务重新启动)后读取来自kafka主题的所有消息。如何提交消费者已经获得的补偿并从中读取?也许一些消费者配置可以帮助。。。
wvmv3b1j1#
这只是猜测,但如何声明消费者的组id?我见过一些使用这种随机分配的例子:
["group.id"] = Guid.NewGuid().ToString(),
如果你宣布一个新的/随机的 group.id 每次启动消费者时,都会在每次执行时注册一个新的消费者组,这涉及 auto.offset.reset 踢进来。如果此属性设置为“ earliest “,然后每次启动消费者时(假设他们每次都有不同的group.id),他们将从第一个可用的偏移量开始,就像在您的情况中一样,从头开始重新读取所有消息。如果此属性设置为“ latest ,并且您的制作者当前没有发送任何消息,您将无法读取任何内容,这可能会造成一些混乱。尝试设置一个固定的 group.id :开始消费,在代理上仍有消息可用时停止进程,然后再次启动消费程序,而不更改最后一个 group.id .这一次,由于消费者群体已经注册, auto.offset.reset 将被忽略,并且起始位置将由提交的偏移量定义,默认情况下,这些偏移量存储在名为 __consumer_offsets .
group.id
auto.offset.reset
earliest
latest
__consumer_offsets
pprl5pva2#
您有两种选择:通过设置使用者属性启用自动提交 EnableAutoCommit = true (消息在可配置的时间(通常为5秒)后提交),或手动提交获取的偏移量 consumer.Commit(consumeResult) .github上显示了手动提交的示例。
EnableAutoCommit = true
consumer.Commit(consumeResult)
2条答案
按热度按时间wvmv3b1j1#
这只是猜测,但如何声明消费者的组id?我见过一些使用这种随机分配的例子:
如果你宣布一个新的/随机的
group.id
每次启动消费者时,都会在每次执行时注册一个新的消费者组,这涉及auto.offset.reset
踢进来。如果此属性设置为“
earliest
“,然后每次启动消费者时(假设他们每次都有不同的group.id),他们将从第一个可用的偏移量开始,就像在您的情况中一样,从头开始重新读取所有消息。如果此属性设置为“
latest
,并且您的制作者当前没有发送任何消息,您将无法读取任何内容,这可能会造成一些混乱。尝试设置一个固定的
group.id
:开始消费,在代理上仍有消息可用时停止进程,然后再次启动消费程序,而不更改最后一个group.id
.这一次,由于消费者群体已经注册,
auto.offset.reset
将被忽略,并且起始位置将由提交的偏移量定义,默认情况下,这些偏移量存储在名为__consumer_offsets
.pprl5pva2#
您有两种选择:
通过设置使用者属性启用自动提交
EnableAutoCommit = true
(消息在可配置的时间(通常为5秒)后提交),或手动提交获取的偏移量
consumer.Commit(consumeResult)
.github上显示了手动提交的示例。