我正在构建一个kafka应用程序,在一个主题上使用日志压缩,但是我无法发送tombstone值(kafkanull)
我曾尝试为序列化程序使用默认配置,但当该配置不起作用时,我使用“publish null/tombstone message with raw headers”中的建议更改将application.properties设置为:
spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer
我必须向流发送消息的代码是
this.stockTopics.compactedStocks().send(MessageBuilder
.withPayload(KafkaNull.INSTANCE)
.setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
.build())
this.stoptopics.compactedstocks()返回一个我可以向其发送消息的消息流。
每次我尝试将kafkanull示例作为有效负载发送该消息时,我都会收到错误消息 Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.
我希望消息只发送给消费者一个空值,但显然它是错误的。
1条答案
按热度按时间w7t8yxp51#
我为此打开了一个github问题。
编辑
解决方法-这是可行的。。。