spring云流kafka流dlq

s8vozzvw  于 2021-07-16  发布在  Java
关注(0)|答案(0)|浏览(270)

我正在使用ApacheKafka2.7.0和SpringCloudstreamKafka流。
在我的spring cloud stream(kafka streams)应用程序中,我将application.yml配置为在输入主题中的消息出现反序列化错误时使用sendtodlq机制:

spring:
  cloud:
    stream:
      function:
        definition: processor      
      bindings:         
        processor-in-0:
          destination: input-topic
          consumer:
            dlqName: input-topic-dlq
        processor-out-0:
          destination: output-topic       
      kafka:
        streams:
            binder:
              deserialization-exception-handler: sendToDlq
            configuration:
              metrics.recording.level: DEBUG
            brokers:
              - localhost:9092

我启动了我的应用程序,但我没有看到这个主题存在。文档说明,如果没有dlq主题,则将创建该主题。
如果我尝试使用dlq主题,会出现如下错误:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
[2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

此时,当我查询zookeeper ls/brokers/topics时,我看到创建的主题。
现在,我尝试将非json消息发布到输入主题(我的默认反序列化程序是json)。
但是我在创建的输入主题dlq topic中看不到任何消息。
奇怪的是,我可以在默认的“error.input topic dlq.appid”主题中看到消息。
我做错什么了吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题