我在用电脑给Kafka发信息 ReplyingKafkaTemplate
它用一个 kafka_correlationId
. 然而,当它击中我的 @KafkaListener
方法并将其转发到回复主题,则标头将丢失。
如何保存Kafka头文件?
这是我的方法签名:
@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
... /* some processing */
return outputs;
}
我创造了一个 ProducerInterceptor
所以我可以看到从 ReplyingKafkaTemplate
,以及来自 @SendTo
注解。从那以后,另一件奇怪的事情是 ReplyingKafkaTemplate
没有添加记录的 kafka_replyTopic
邮件的标题。
下面是 ReplyingKafkaTemplate
已配置:
@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
我不确定这是否相关,但我也添加了springcloudsleuth作为依赖项,发送消息时span/trace头也在那里,但转发消息时会生成新的头。
1条答案
按热度按时间nbewdwxp1#
默认情况下,请求消息中的任意头不会复制到回复消息中,只有
kafka_correlationId
.从版本2.2开始,您可以配置
ReplyHeadersConfigurer
调用以确定应复制哪些标头。请参阅文档。
从版本2.2开始,您可以添加
ReplyHeadersConfigurer
到监听器容器工厂。这将用于确定要在回复消息中设置的标头。编辑
顺便说一句,在2.2中,如果没有标题,rkt会自动设置replyto。
有了2.1.x,它是可以做到的,但是它有点复杂,你必须自己做一些工作。关键是接收和回复
Message<?>
...