apache kafka:暂停后提交同步

aij0ehis  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(350)

在我们的代码中,我们计划手动提交偏移量。我们对数据的处理是长期的,因此我们遵循前面建议的模式
阅读记录
在自己的线程中处理记录
暂停消费者
继续轮询暂停的消费者,使其处于活动状态
处理记录时,提交偏移量
提交完成后,恢复使用者
代码有点像这样:

while (true) { 
          ConsumerRecords<String, String> records = consumer.poll(kafkaConfig.getTopicPolling());
                if (!records.isEmpty()) {
                    task = pool.submit(new ProcessorTask(processor, createRecordsList(records)));
                }
                if (shouldPause(task)) {
                    consumer.pause(listener.getPartitions());
                }
                if (isDoneProcessing(task)) {
                   consumer.commitSync();
                   consumer.resume(listener.getPartitions());
                }
    }

如果您注意到了,我们将使用commitsync()提交(不带任何参数)。由于消费者被暂停,在下一次迭代中我们将不会得到任何记录。但是commitsync()稍后会发生。在这种情况下,它会尝试提交哪个偏移量?我已经阅读了权威指南并在google上搜索了一下,但找不到任何关于它的信息。
我认为我们应该明确地保存偏移量。但我不确定当前的代码是否会成为一个问题。
任何信息都会有帮助。
谢谢,普拉蒂克

snz8szmq

snz8szmq1#

如果你打电话 consumer.commitSync() 如果没有参数,它应该提交消费者收到的最新偏移量。因为你可以在同一时间收到许多信息 poll() 您可能希望对提交具有更好的控制,并显式提交特定的偏移量,例如您的使用者已成功处理的最新消息。这可以通过打电话来完成 commitSync(Map<TopicPartition,OffsetAndMetadata> offsets) 您可以在consumerjavadoc中看到调用commitsync的两种方法的语法http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#commitsync()

相关问题