我有一个kafka主题和一个消费者,在spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从头开始读取所有接收到的消息。这应该是通过 resetOffsets
属性,但从这个问题可以清楚地看出,它目前不起作用。
我在kafka consumer api中发现了这种解决方法,它建议在每次重新启动时为consumer组分配一个新的随机名称,作为从最早开始读取的一种方式。在Spring的云流中有可能/推荐吗?如何为消费者组定义动态名称?
我有一个kafka主题和一个消费者,在spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从头开始读取所有接收到的消息。这应该是通过 resetOffsets
属性,但从这个问题可以清楚地看出,它目前不起作用。
我在kafka consumer api中发现了这种解决方法,它建议在每次重新启动时为consumer组分配一个新的随机名称,作为从最早开始读取的一种方式。在Spring的云流中有可能/推荐吗?如何为消费者组定义动态名称?
2条答案
按热度按时间kmbjn2e31#
如果每次都要求应用程序从头开始重新启动,则有几个选项:
您可以将提交的偏移量重置为
earliest
在使用kafka-consumer-groups.sh
工具(kafka.admin.ConsumerGroupCommand.scala
)在重新启动时,应用程序可以查找到开头并手动提交偏移量0。如果你有
auto.offset.reset
设置为earliest
即使0不是有效的偏移量,它也会从头开始重新启动。你可以使用不同的消费者
group.id
珍惜每一次。在您的消费者中Configuration
豆子,插入Properties
对象如下:最后,你会使用委员会补偿吗?如果没有,就禁用
enable.auto.commit
然后应用程序将始终遵循auto.offset.reset
设置。选项1和2通常是首选的,因为它们保持一致的
group.id
允许轻松地将使用者示例添加到组并监视th组。zbwhf8kr2#
是的,它也适用于scst,但是正如您所说的,设置一个随机组id有点棘手,尽管您可以将其设置为
System.property
在启动SpringApplication
.如果您直接使用springkafka,那么很简单,只需实现
ConsumerSeekAware
你可以的seekToBeginning
分配分区时。但是,使用scst,您不能直接访问侦听器。
一种解决方法是在启动
SpringApplication
通过创建具有相同组id的使用者,这会有点棘手,但是如果你有多个应用程序示例,因为你的应用程序每次可能会得到不同的分区。我们将再次讨论如何解决这个问题(我刚刚对此发表了评论)。