我们正试图在JavaSpring项目中实现kafka确认。在没有确认的情况下,我们成功地接收并读取了消息,但是当我们在方法中添加确认时,我们会收到以下错误:
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from
[proto.DeviceModelOuterClass$DevModel] to
[org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=sender: "12345678-1234-1234-1234-123456789102"
我们实现确认的方式如kafka api中所述:
@KafkaListener(
id = Constants.TOPIC_LISTENER,
topics = "${info.dev.name}",
autoStartup = "false",
properties = {
"value.deserializer=com.cit.iomt.core.DevModelDeserializer",
"key.deserializer=org.apache.kafka.common.serialization.UUIDDeserializer"
})
public void listenToUpdateTopic(@Payload DevModel message, Acknowledgment a) throws Exception {
LOG.info(Constants.READ_KAFKA_TOPIC, message);
a.acknowledge();}
在属性文件中我们有:
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
暂无答案!
目前还没有任何答案,快来回答吧!