kafka用户手动提交偏移量

bvk5enib  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(440)

我正在实现和dsl-spring集成流,它接收来自kafka的消息
代码段:

return IntegrationFlows.from(
                Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
                        kafkaPropertiesConfiguration.getTelemetryDataTopic()))
                })
                .handle(bla.someImportantOperation())
                //TODO:do manual commit here
                //.handle(consumer.commitSync())

                .get();

我想知道如何手动提交同步,但只能在 .handle(bla.someImportantOperation()) 已成功完成。
我不知道如何获得消费者参考,因为我使用的是defaultkafkaconsumerfactory,如果有任何帮助,我将不胜感激。
以下是我用来创建consumer的consumerproperties:

consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());

consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
6gpjuf90

6gpjuf901#

这个 Kafka.messageDrivenChannelAdapter() 为您提供配置程序挂钩:

.configureListenerContainer(c ->
                                c.ackMode(ContainerProperties.AckMode.MANUAL))

注意我提供的选项。
阅读它的javadocs,然后 AcknowledgingMessageListener . 有人提到 Acknowledgment . 这一个通过出现在消息头中 KafkaHeaders.ACKNOWLEDGMENT .
那么,你需要什么 //.handle(consumer.commitSync()) 就像这样:

.handle(m -> headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge())

有关ApacheKafka文档,请参见 Spring 的更多信息:https://docs.spring.io/spring-kafka/docs/2.2.0.release/reference/html/_reference.html#committing-偏移量

相关问题