如何为Kafka的消费者发送offsetcommitrequest?

bhmjp9jg  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(468)

我尝试在kafka consumer api中为版本0.9使用offsetcommitrequest,该版本包含在以下包中:org.apache.kafka.common.requests.offsetcommitrequest
如何发送此请求?理想的使用方法是什么?我想把Kafka本身的补偿。我没有找到任何与版本0.9相关的文档。大部分都是0.8.x版本
此外,此请求的构造函数采用生成id、成员id和保留时间。这些领域是什么?

dgenwo3n

dgenwo3n1#

如果要手动提交偏移量,可能应该设置使用者属性 enable.auto.commit=false 并使用kafka consumer的commitsync()或commitsync()方法。例如,您可以在处理所有consumerrecords之后调用commitsync()。或者,即使在每次收到consumerrecord之后,也可以只提交所需的topicpartition。这样地:

Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
offsetMap.put(new TopicPartition(someTopic, somePartition), new OffsetAndMetadata(someOffset));
kafkaConsumer.commitSync(offsetMap);

相关问题