quarkus kafka将异常反序列化到死信队列

vktxenjb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(420)

为了满足服务的可靠性,我需要将所有无法反序列化的传入消息推送到一个死信主题中 kafka-smallrye 以及 quarkus .
主题上的所有消息都应该是avro格式的(但我不能确定),并且在模式注册表上有一个define模式。
我已按以下方式设置我的使用者的配置:

mp:
  messaging:
    incoming:
      test-in:
        connector: smallrye-kafka
        group:
          id: test-in-consumer-group
        topic: events-topic
        failure-strategy: dead-letter-queue
        schema:
          registry:
            url: http://localhost:8081
        value:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        key:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        specific:
          avro:
            reader: true

我的消费者代码:

@ApplicationScoped
public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
            String schemaFullName = data.getPayload().getSchema().getFullName();
            System.out.println(schemaFullName);

            // other consumer code
            return data.ack();
    }
}

当使用者无法反序列化消息时,消费进程将被阻止,而不是将消息移到死信并继续。我认为反序列化错误不会产生 nack 所以这条信息不能移到死信上。
有没有一种方法可以将不值得使用的消息移到死信主题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题