将使用者偏移量重置为kafka流的开头

2guxujil  于 2021-06-30  发布在  Kafka
关注(0)|答案(2)|浏览(429)

我正在使用kafka流,并希望从java开始重置一些消费偏移量。 KafkaConsumer.seekToBeginning(...) 听起来这样做是对的,但我和Kafka一起工作:

KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();

我猜,根据我定义的混凝土流管道,这将在引擎盖下创建几个消费者。我能拿到那些吗?或者是否有其他方法以编程方式重置偏移?

kqhtkvqz

kqhtkvqz1#

因为您使用的是kafka流,所以您不仅需要重置使用者偏移,还需要重置流的内部状态存储。
幸运的是,kafka提供了一个streams应用程序重置工具。
看到了吗https://cwiki.apache.org/confluence/display/kafka/kafka+streams+application+reset+tool

oprakyz7

oprakyz72#

在hans jespersens answer的基础上,我成功地使用以下代码来完成脚本在java代码中所做的工作:

import kafka.tools.StreamsResetter;

StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);

该类是我在maven中导入的kafka核心库的一部分,使用:

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>${kafka.version}</version>
    </dependency>

相关问题