kafka不会再次轮询,即使在循环内没有执行commit

cgvd09ve  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(246)

在调试模式下启动应用程序时,我的应用程序可以很好地运行以下代码,但在运行应用程序时,它不会进入循环。即使当前时间戳大于ts2,它也不会进入循环。

@KafkaListener( id ="consumerContainer", topics = "topic1", groupId = "Consumer_Test", containerFactory = "kafkaListenerContainerFactory")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws Exception {

           Long ts1 = record.timestamp();
           Long ts2 = ts1 + TimeUnit.MINUTES.toMillis(5);
           Long currentDateTime = System.currentTimeMillis();
           int b3 = currentDateTime.compareTo(ts2);

              if (b3 > 0) {
               this.recordPublisher.publish(record.value());
               acknowledgment.acknowledge();
               System.out.println("record received is "+record.key());
           }
    }

我的工厂配置也是:

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题