我有一个KafkaListener,它正在使用来自Kafka的Avro消息。如果处理所使用消息的逻辑抛出异常,错误处理程序会将消息发送到DLT。我想将该消息作为JSON发送,并忽略任何模式注册表内容。我遇到了问题,因为如果我尝试使用JSON序列化器,它将无法序列化,因为它是一个Avro对象。
错误处理程序
@Bean
public DefaultErrorHandler errorHandler(final KafkaTemplate<String, ?> template) {
DefaultErrorHandler handler =
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 0L));
handler.addNotRetryableExceptions(
IllegalArgumentException.class, SaveFactFailedException.class);
return handler;
}
生成器配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer
这是第一个问题。
第二个问题是原始侦听器和DLT侦听器都使用application.yml
中设置的同一个反序列化器。如何为每个侦听器配置不同的反序列化器?
用户配置
consumer:
enable-auto-commit: false
group-id: mobile-layout-group
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer
auto-offset-reset: earliest
1条答案
按热度按时间bzzcjhmw1#
所以我能够解决这个问题。我所要做的就是创建一个
DeadLetterPublishingRecoverer
的子类并覆盖createProducer()
函数。然后我能够修改记录值。在我的例子中,我只是将其设置为空,因为我只需要DLT的标头。这篇文章很有帮助。Spring Cloud Stream JSON Dead Letter Queue with Avro messages