我正在构建一个springboot2.4.4应用程序,使用模式验证向kafka(汇合平台)生成消息。模式是在主题的模式注册表上设置的,我已经创建了pojo( LoanInitiate
)从相关的avsc文件使用maven avro插件。使用以下方式发送时,没有问题:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
...
@Autowired
private KafkaTemplate<String, LoanInitiate> kafkaTemplate;
@Value("${kafka.topic}")
private String topic;
public void regularSend(){
LoanInitiate li = initializeLoanInitiate();
kafkaTemplate.send(topic, li); //ok
}
但是,如果我尝试以下操作,我将得到 Unsupported Avro type.
```
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate2;
public void sendMessage(){
Message message = MessageBuilder
.withPayload(li)
.setHeader(KafkaHeaders.CORRELATION_ID, "Correlation ID")
.build();
kafkaTemplate2.send(topic, message); //"Unsupported Avro type"
}
我的生产者配置如下:
Producer values
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
我知道 `kafkaTemplate2` 正在发送一个 `Message<LoanInitiate>` 而不是一个 `LoanInitiate` 但是我不清楚我缺少什么来编写消息和标题。
我应该如何发送邮件以及邮件头信息?
2条答案
按热度按时间5sxhfpxr1#
至于这个版本,这取决于Jackson是否走上了职业道路;如果是,则默认
MessagingMessageConverter
得到一个DefaultKafkaHeaderMapper
它可以Map任何json友好类型的头,否则会得到SimpleKafkaHeaderMapper
它只Map带有byte[]
价值观。所以,如果你加上
getBytes()
就价值而言,它应该起作用。您可以添加
MessagingMessageConverter
配置有SimpleKafkaHeaderMapper
到模板及其mapAllStringsOut
设置为true,这也会将标题Map为String
价值观。在入站端,可以通过设置
rawMappedHeaders
适当的财产。您还可以使用自己的自定义头Map器。
看到了吗https://docs.spring.io/spring-kafka/reference/html/#headers 以及https://docs.spring.io/spring-kafka/reference/html/#messaging-消息转换
以及Map绘制者的javadocs。
ego6inou2#
经过几次迭代,发布问题,然后休息,我发现了一个解决方案,使用
ProducerRecord
```ProducerRecord<String, LoanInitiate> producerRecord = new ProducerRecord<>(topic, li);
producerRecord
.headers()
.add(KafkaHeaders.CORRELATION_ID,
"Correlation ID".getBytes(StandardCharsets.UTF_8));