我正在使用kafka流,并希望从java开始重置一些消费偏移量。 KafkaConsumer.seekToBeginning(...) 听起来这样做是对的,但我和Kafka一起工作:
KafkaConsumer.seekToBeginning(...)
KafkaStreams streams = new KafkaStreams(builder, props); ... streams.start();
我猜,根据我定义的混凝土流管道,这将在引擎盖下创建几个消费者。我能拿到那些吗?或者是否有其他方法以编程方式重置偏移?
kqhtkvqz1#
因为您使用的是kafka流,所以您不仅需要重置使用者偏移,还需要重置流的内部状态存储。幸运的是,kafka提供了一个streams应用程序重置工具。看到了吗https://cwiki.apache.org/confluence/display/kafka/kafka+streams+application+reset+tool
oprakyz72#
在hans jespersens answer的基础上,我成功地使用以下代码来完成脚本在java代码中所做的工作:
import kafka.tools.StreamsResetter; StreamsResetter resetter = new StreamsResetter(); String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER}; resetter.run(args);
该类是我在maven中导入的kafka核心库的一部分,使用:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${kafka.version}</version> </dependency>
2条答案
按热度按时间kqhtkvqz1#
因为您使用的是kafka流,所以您不仅需要重置使用者偏移,还需要重置流的内部状态存储。
幸运的是,kafka提供了一个streams应用程序重置工具。
看到了吗https://cwiki.apache.org/confluence/display/kafka/kafka+streams+application+reset+tool
oprakyz72#
在hans jespersens answer的基础上,我成功地使用以下代码来完成脚本在java代码中所做的工作:
该类是我在maven中导入的kafka核心库的一部分,使用: