消费群体的Kafka流偏移重置为零

9nvpjoqh  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(402)

我已经编写了kafka流应用程序,它只是根据一些条件过滤行并将其加载到mongodb。
流处理工作正常,但由于我的代码中的一些缺陷,我想重新处理整个数据。
一种方法是杀死流媒体应用程序,更改消费者组id,从mongo中删除数据,然后重新运行应用程序。
如何在不更改消费者组id的情况下实现此场景。
<<我正在使用Kafka0.10版本>>
谢谢你,帕里

3bygqnnd

3bygqnnd1#

从matthias j。萨克斯matthias@confluent.io -
目前,更改应用程序id是最好的方法。正确清理应用程序状态有点棘手。我们目前正在对此进行改进——应该很快就能提供。
看到了吗https://issues.apache.org/jira/browse/kafka-3185
干杯,帕里

vyswwuz2

vyswwuz22#

apachekafka 0.10.0.1(8月份发布,7月份提出了最初的问题)附带了一个新的kafka流应用程序重置工具,这是一个比简单重命名更简单、更好/更干净的解决方案 application.id .
您可以通过脚本执行该工具 bin/kafka-streams-application-reset.sh ,它还将打印用法/帮助消息。
例子:


# Run this only after ALL application instances were stopped!

$ bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic \
                                      --bootstrap-servers brokerHost:9092 \
                                      --zookeeper zookeeperHost:2181

也就是说,我建议阅读kafka streams的数据再处理:重置streams应用程序,前面提到的matthias j。萨克斯写道,进一步的细节。这篇文章还解释了为什么要简单地重命名 application.id (这是目前为止的解决办法)不是最好的主意。

相关问题