我正在使用ApacheKafka2.7.0和SpringCloudstreamKafka流。
在我的spring cloud stream(kafka streams)应用程序中,我将application.yml配置为在输入主题中的消息出现反序列化错误时使用sendtodlq机制:
spring:
cloud:
stream:
function:
definition: processor
bindings:
processor-in-0:
destination: input-topic
consumer:
dlqName: input-topic-dlq
processor-out-0:
destination: output-topic
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
configuration:
metrics.recording.level: DEBUG
brokers:
- localhost:9092
我启动了我的应用程序,但我没有看到这个主题存在。文档说明,如果没有dlq主题,则将创建该主题。
如果我尝试使用dlq主题,会出现如下错误:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
[2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
此时,当我查询zookeeper ls/brokers/topics时,我看到创建的主题。
现在,我尝试将非json消息发布到输入主题(我的默认反序列化程序是json)。
但是我在创建的输入主题dlq topic中看不到任何消息。
奇怪的是,我可以在默认的“error.input topic dlq.appid”主题中看到消息。
我做错什么了吗?
暂无答案!
目前还没有任何答案,快来回答吧!