ApacheKafka—通过在每次应用程序重新启动时创建新的使用者组来重置主题偏移量

lb3vh1jj  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(312)

我有一个kafka主题和一个消费者,在spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从头开始读取所有接收到的消息。这应该是通过 resetOffsets 属性,但从这个问题可以清楚地看出,它目前不起作用。
我在kafka consumer api中发现了这种解决方法,它建议在每次重新启动时为consumer组分配一个新的随机名称,作为从最早开始读取的一种方式。在Spring的云流中有可能/推荐吗?如何为消费者组定义动态名称?

kmbjn2e3

kmbjn2e31#

如果每次都要求应用程序从头开始重新启动,则有几个选项:
您可以将提交的偏移量重置为 earliest 在使用 kafka-consumer-groups.sh 工具( kafka.admin.ConsumerGroupCommand.scala )
在重新启动时,应用程序可以查找到开头并手动提交偏移量0。如果你有 auto.offset.reset 设置为 earliest 即使0不是有效的偏移量,它也会从头开始重新启动。
你可以使用不同的消费者 group.id 珍惜每一次。在您的消费者中 Configuration 豆子,插入 Properties 对象如下:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

最后,你会使用委员会补偿吗?如果没有,就禁用 enable.auto.commit 然后应用程序将始终遵循 auto.offset.reset 设置。
选项1和2通常是首选的,因为它们保持一致的 group.id 允许轻松地将使用者示例添加到组并监视th组。

zbwhf8kr

zbwhf8kr2#

是的,它也适用于scst,但是正如您所说的,设置一个随机组id有点棘手,尽管您可以将其设置为 System.property 在启动 SpringApplication .
如果您直接使用springkafka,那么很简单,只需实现 ConsumerSeekAware 你可以的 seekToBeginning 分配分区时。
但是,使用scst,您不能直接访问侦听器。
一种解决方法是在启动 SpringApplication 通过创建具有相同组id的使用者,这会有点棘手,但是如果你有多个应用程序示例,因为你的应用程序每次可能会得到不同的分区。
我们将再次讨论如何解决这个问题(我刚刚对此发表了评论)。

相关问题