vertx-kafka自动提交

ih99xse1  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(434)

一直在尝试使用vertx用java编写kafka消费者。
我需要将自动提交设置为false(特定用例)。
下面是执行显式轮询的代码

consumer.subscribe("test", ar -> {

if (ar.succeeded()) {
System.out.println("Consumer subscribed");

vertx.setPeriodic(1000, timerId -> {

  consumer.poll(100, ar1 -> {

    if (ar1.succeeded()) {

      KafkaConsumerRecords<String, String> records = ar1.result();
      for (int i = 0; i < records.size(); i++) {
        KafkaConsumerRecord<String, String> record = records.recordAt(i);
        System.out.println("key=" + record.key() + ",value=" + record.value() +
          ",partition=" + record.partition() + ",offset=" + record.offset());
      }
    }
  });

});

} });
手动提交:

consumer.commit(ar -> {

 if (ar.succeeded()) {
 System.out.println("Last read message offset committed");
 }
 });

我的问题是,如果轮询频率设置为1000ms并且提交是手动的,那么如果消息没有在1000ms内处理,会发生什么?
下一次轮询将在处理第一组消息之前完成吗?如果是,它会再次获取同一组消息(尚未提交)还是一组较新的消息?

bakd9h0s

bakd9h0s1#

查看 KafkaConsumer#poll :
在每次轮询中,使用者将尝试使用上次使用的偏移量作为起始偏移量,并按顺序获取。最后消耗的偏移量可以通过seek(topicpartition,long)手动设置,也可以自动设置为订阅的分区列表的最后提交偏移量
最后消耗的偏移量是指 KafkaConsumer ,而不是它所承诺的那个。这意味着它不会再次获取相同的消息,但它将获取下一个100条消息。

相关问题