我正在尝试配置Spring Kafka,以便通过KafkaTemplate
发送的事件不包含神奇的__TypeId__
头。从我在文档和在线阅读的内容来看,JsonSerializer.ADD_TYPE_INFO_HEADERS
应该会给予我想要的结果。但我没有使用Spring的JsonSerializer
,而是使用org.apache.kafka.common.serialization.ByteArraySerializer
与ByteArrayJsonMessageConverter
的组合。所以,我很困惑,在我的情况下,该标头被设置在何处以及如何禁用它?
下面是我的配置代码:
@TestConfiguration
public static class TestKafkaProducerConfig {
@Bean
public KafkaTemplate<String, MyEvent> kafkaTemplate(
@Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {
var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
template.setDefaultTopic(topicName);
var headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setMapAllStringsOut(true);
var messageConverter = new ByteArrayJsonMessageConverter();
messageConverter.setHeaderMapper(headerMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
}
public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
}
如果我使用JsonSerializer
,那么JsonSerializer.ADD_TYPE_INFO_HEADERS
设置是受尊重的,我不会得到__TypeId__
头集。但是,这样我就不能将我的定制MessageConverter
与SimpleKafkaHeaderMapper
和mapAllStringsOut
一起使用,这会阻止另一个魔术头spring_json_header_types
。
我的总体目标是避免Spring Kafka设置任何魔术头,如__TypeId__
和spring_json_header_types
。
2条答案
按热度按时间sr4lhrrt1#
这些头确实是从该转换器填充的。它委托给
typeMapper
:其中,
DefaultJackson2JavaTypeMapper
在上述__TypeId__
报头周围添加了一个逻辑:默认情况下,
getClassIdFieldName()
是AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME
。因此,使用您当前的
ByteArraySerializer
和ByteArrayJsonMessageConverter
组合,没有办法防止该头文件出现在ProducerRecord
中。除非您注入一个自定义的Jackson2JavaTypeMapper
,其中覆盖了fromJavaType()
和空的body。你真的可以用
DefaultKafkaHeaderMapper
和它的setMapAllStringsOut(true)
来避免它,如果你要发送的所有头都是String
或已经是byte[]
,就不会有spring_json_header_types
额外的头。请参阅此ctor
DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns)
,并仅提供您真正需要发送的那些头。4ktjp1zp2#
正如@artem-bilan所解释的那样,在使用
ByteArraySerializer
和ByteArrayJsonMessageConverter
时,要摆脱__TypeId__
并不容易。所以,我使用了JsonSerializer
和MessagingMessageConverter
。下面是我的配置,它产生了没有任何魔法头的Kafka事件。