spring云流kafka流dlq

s8vozzvw  于 2021-07-16  发布在  Java
关注(0)|答案(0)|浏览(307)

我正在使用ApacheKafka2.7.0和SpringCloudstreamKafka流。
在我的spring cloud stream(kafka streams)应用程序中,我将application.yml配置为在输入主题中的消息出现反序列化错误时使用sendtodlq机制:

  1. spring:
  2. cloud:
  3. stream:
  4. function:
  5. definition: processor
  6. bindings:
  7. processor-in-0:
  8. destination: input-topic
  9. consumer:
  10. dlqName: input-topic-dlq
  11. processor-out-0:
  12. destination: output-topic
  13. kafka:
  14. streams:
  15. binder:
  16. deserialization-exception-handler: sendToDlq
  17. configuration:
  18. metrics.recording.level: DEBUG
  19. brokers:
  20. - localhost:9092

我启动了我的应用程序,但我没有看到这个主题存在。文档说明,如果没有dlq主题,则将创建该主题。
如果我尝试使用dlq主题,会出现如下错误:

  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
  2. [2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

此时,当我查询zookeeper ls/brokers/topics时,我看到创建的主题。
现在,我尝试将非json消息发布到输入主题(我的默认反序列化程序是json)。
但是我在创建的输入主题dlq topic中看不到任何消息。
奇怪的是,我可以在默认的“error.input topic dlq.appid”主题中看到消息。
我做错什么了吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题