如何使用kafkatemplate设置消息头?

0aydgbwb  于 2021-07-12  发布在  Java
关注(0)|答案(2)|浏览(1331)

我正在构建一个springboot2.4.4应用程序,使用模式验证向kafka(汇合平台)生成消息。模式是在主题的模式注册表上设置的,我已经创建了pojo( LoanInitiate )从相关的avsc文件使用maven avro插件。使用以下方式发送时,没有问题:

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
...

@Autowired
private KafkaTemplate<String, LoanInitiate> kafkaTemplate;

@Value("${kafka.topic}")
private String topic;

public void regularSend(){
    LoanInitiate li = initializeLoanInitiate();
    kafkaTemplate.send(topic, li); //ok
}

但是,如果我尝试以下操作,我将得到 Unsupported Avro type. ```
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate2;

public void sendMessage(){
Message message = MessageBuilder
.withPayload(li)
.setHeader(KafkaHeaders.CORRELATION_ID, "Correlation ID")
.build();
kafkaTemplate2.send(topic, message); //"Unsupported Avro type"

}

我的生产者配置如下:

Producer values

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

我知道 `kafkaTemplate2` 正在发送一个 `Message<LoanInitiate>` 而不是一个 `LoanInitiate` 但是我不清楚我缺少什么来编写消息和标题。
我应该如何发送邮件以及邮件头信息?
5sxhfpxr

5sxhfpxr1#

至于这个版本,这取决于Jackson是否走上了职业道路;如果是,则默认 MessagingMessageConverter 得到一个 DefaultKafkaHeaderMapper 它可以Map任何json友好类型的头,否则会得到 SimpleKafkaHeaderMapper 它只Map带有 byte[] 价值观。
所以,如果你加上 getBytes() 就价值而言,它应该起作用。
您可以添加 MessagingMessageConverter 配置有 SimpleKafkaHeaderMapper 到模板及其 mapAllStringsOut 设置为true,这也会将标题Map为 String 价值观。
在入站端,可以通过设置 rawMappedHeaders 适当的财产。
您还可以使用自己的自定义头Map器。
看到了吗https://docs.spring.io/spring-kafka/reference/html/#headers 以及https://docs.spring.io/spring-kafka/reference/html/#messaging-消息转换
以及Map绘制者的javadocs。

ego6inou

ego6inou2#

经过几次迭代,发布问题,然后休息,我发现了一个解决方案,使用 ProducerRecord ```
ProducerRecord<String, LoanInitiate> producerRecord = new ProducerRecord<>(topic, li);
producerRecord
.headers()
.add(KafkaHeaders.CORRELATION_ID,
"Correlation ID".getBytes(StandardCharsets.UTF_8));

        kafkaTemplate.send(producerRecord);

相关问题