acknowledge.acknowledge()在spring kafka@kafkalistener中引发异常

pcrecxhr  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(833)

当我将enable.auto.commit设置为false并尝试使用基于注解的spring-kafka@kafkalistener手动提交偏移量时,我得到一个org.springframework.kafka.listener.listenerexecutionfailedexception:无法使用传入消息调用侦听器方法
我有一个非常简单的代码如下:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
    System.out.println("Received Messasge in group 'foo': " + message);

    // TODO: Do something with the message
}

当我从制作人那里发送消息时,我得到以下例外:
org.springframework.kafka.listener.listenerexecutionfailedexception:无法对传入消息调用侦听器方法。
终结点处理程序详细信息:
方法[public void com.******************.kafkamessagelistener.listenfoogroup(java.lang.string,org.springframework.kafka.support.acknowledgement)]
bean[com.......*.*****。kafkamessagelistener@5856dbe4]; 嵌套异常为org.springframework.messaging.converter.messageconversionexception:无法处理消息;嵌套异常为org.springframework.messaging.converter.messageconversionexception:无法将genericmessage[payload=test,headers={kafka\u offset=57,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=0,kafka\u receivedtopic=demotopic}]从[java.lang.string]转换为[org.springframework.kafka.support.acknowledment],failedmessage=genericmessage[payload=test,headers={kafka\u offset=57,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=0,kafka\u receivedtopic=demotopic}]
请帮忙。蒂亚。

3qpi33ja

3qpi33ja1#

你必须设置集装箱工厂的 containerProperties 确认模式为 MANUAL 或者 MANUAL_IMMEDIATE 得到一个 Acknowledgment 对象。
对于其他ack模式,容器负责提交偏移量。

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)

或设置 ....ackMode 属性(如果使用spring boot)

相关问题