在调试模式下启动应用程序时,我的应用程序可以很好地运行以下代码,但在运行应用程序时,它不会进入循环。即使当前时间戳大于ts2,它也不会进入循环。
@KafkaListener( id ="consumerContainer", topics = "topic1", groupId = "Consumer_Test", containerFactory = "kafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws Exception {
Long ts1 = record.timestamp();
Long ts2 = ts1 + TimeUnit.MINUTES.toMillis(5);
Long currentDateTime = System.currentTimeMillis();
int b3 = currentDateTime.compareTo(ts2);
if (b3 > 0) {
this.recordPublisher.publish(record.value());
acknowledgment.acknowledge();
System.out.println("record received is "+record.key());
}
}
我的工厂配置也是:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
暂无答案!
目前还没有任何答案,快来回答吧!