spring kafka@sendto不发送头

bwitn5fc  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(579)

我在用电脑给Kafka发信息 ReplyingKafkaTemplate 它用一个 kafka_correlationId . 然而,当它击中我的 @KafkaListener 方法并将其转发到回复主题,则标头将丢失。
如何保存Kafka头文件?
这是我的方法签名:

@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
  ... /* some processing */
  return outputs;
}

我创造了一个 ProducerInterceptor 所以我可以看到从 ReplyingKafkaTemplate ,以及来自 @SendTo 注解。从那以后,另一件奇怪的事情是 ReplyingKafkaTemplate 没有添加记录的 kafka_replyTopic 邮件的标题。
下面是 ReplyingKafkaTemplate 已配置:

@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
  ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
  return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
  return new ReplyingKafkaTemplate<>(pf, container);
}

我不确定这是否相关,但我也添加了springcloudsleuth作为依赖项,发送消息时span/trace头也在那里,但转发消息时会生成新的头。

nbewdwxp

nbewdwxp1#

默认情况下,请求消息中的任意头不会复制到回复消息中,只有 kafka_correlationId .
从版本2.2开始,您可以配置 ReplyHeadersConfigurer 调用以确定应复制哪些标头。
请参阅文档。
从版本2.2开始,您可以添加 ReplyHeadersConfigurer 到监听器容器工厂。这将用于确定要在回复消息中设置的标头。
编辑
顺便说一句,在2.2中,如果没有标题,rkt会自动设置replyto。
有了2.1.x,它是可以做到的,但是它有点复杂,你必须自己做一些工作。关键是接收和回复 Message<?> ...

@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
    System.out.println(in);
    Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
    byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
    byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
    return MessageBuilder.withPayload(in.getPayload().toUpperCase())
            .setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .build();
}

// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    MessagingMessageConverter messageConverter = new MessagingMessageConverter();
    messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
    kafkaTemplate.setMessageConverter(messageConverter);
    return kafkaTemplate;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
    return args -> {
        Headers headers = new RecordHeaders();
        headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
        headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
        ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
        RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
        ConsumerRecord<String, String> reply = future.get();
        System.out.println("Reply: " + reply.value() + " myHeader="
                + new String(reply.headers().lastHeader("myHeader").value()));
    };
}

相关问题