Spring Boot 如何避免为死信主题使用模式注册表?

b4qexyjb  于 2022-11-29  发布在  Spring
关注(0)|答案(1)|浏览(127)

我有一个KafkaListener,它正在使用来自Kafka的Avro消息。如果处理所使用消息的逻辑抛出异常,错误处理程序会将消息发送到DLT。我想将该消息作为JSON发送,并忽略任何模式注册表内容。我遇到了问题,因为如果我尝试使用JSON序列化器,它将无法序列化,因为它是一个Avro对象。

错误处理程序

@Bean
  public DefaultErrorHandler errorHandler(final KafkaTemplate<String, ?> template) {

    DefaultErrorHandler handler =
        new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 0L));
    handler.addNotRetryableExceptions(
        IllegalArgumentException.class, SaveFactFailedException.class);
    return handler;
  }

生成器配置

producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer

这是第一个问题。
第二个问题是原始侦听器和DLT侦听器都使用application.yml中设置的同一个反序列化器。如何为每个侦听器配置不同的反序列化器?

用户配置

consumer:
      enable-auto-commit: false
      group-id: mobile-layout-group
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer
      auto-offset-reset: earliest
bzzcjhmw

bzzcjhmw1#

所以我能够解决这个问题。我所要做的就是创建一个DeadLetterPublishingRecoverer的子类并覆盖createProducer()函数。然后我能够修改记录值。在我的例子中,我只是将其设置为空,因为我只需要DLT的标头。这篇文章很有帮助。
Spring Cloud Stream JSON Dead Letter Queue with Avro messages

相关问题