kafka没有可用作参数的确认

gkl3eglg  于 2021-07-26  发布在  Java
关注(0)|答案(0)|浏览(2251)

我们正试图在JavaSpring项目中实现kafka确认。在没有确认的情况下,我们成功地接收并读取了消息,但是当我们在方法中添加确认时,我们会收到以下错误:

  1. org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException:
  2. No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
  3. nested exception is java.lang.IllegalStateException:
  4. No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
  5. Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from
  6. [proto.DeviceModelOuterClass$DevModel] to
  7. [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=sender: "12345678-1234-1234-1234-123456789102"

我们实现确认的方式如kafka api中所述:

  1. @KafkaListener(
  2. id = Constants.TOPIC_LISTENER,
  3. topics = "${info.dev.name}",
  4. autoStartup = "false",
  5. properties = {
  6. "value.deserializer=com.cit.iomt.core.DevModelDeserializer",
  7. "key.deserializer=org.apache.kafka.common.serialization.UUIDDeserializer"
  8. })
  9. public void listenToUpdateTopic(@Payload DevModel message, Acknowledgment a) throws Exception {
  10. LOG.info(Constants.READ_KAFKA_TOPIC, message);
  11. a.acknowledge();}

在属性文件中我们有:

  1. spring.kafka.listener.ack-mode=manual_immediate
  2. spring.kafka.consumer.enable-auto-commit=false
  3. spring.kafka.consumer.auto-offset-reset=earliest

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题