java 避免Spring Kafka设置的__TypeId__和spring_json_header_types魔术头

laik7k3q  于 2023-04-04  发布在  Java
关注(0)|答案(2)|浏览(182)

我正在尝试配置Spring Kafka,以便通过KafkaTemplate发送的事件不包含神奇的__TypeId__头。从我在文档和在线阅读的内容来看,JsonSerializer.ADD_TYPE_INFO_HEADERS应该会给予我想要的结果。但我没有使用Spring的JsonSerializer,而是使用org.apache.kafka.common.serialization.ByteArraySerializerByteArrayJsonMessageConverter的组合。所以,我很困惑,在我的情况下,该标头被设置在何处以及如何禁用它?
下面是我的配置代码:

@TestConfiguration
public static class TestKafkaProducerConfig {

    @Bean
    public KafkaTemplate<String, MyEvent> kafkaTemplate(
            @Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {

        var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
        template.setDefaultTopic(topicName);

        var headerMapper = new SimpleKafkaHeaderMapper();
        headerMapper.setMapAllStringsOut(true);

        var messageConverter = new ByteArrayJsonMessageConverter();
        messageConverter.setHeaderMapper(headerMapper);

        template.setMessageConverter(messageConverter);
        return template;
    }

    @Bean
    public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
        return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
    }

    public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        return props;
    }
}

如果我使用JsonSerializer,那么JsonSerializer.ADD_TYPE_INFO_HEADERS设置是受尊重的,我不会得到__TypeId__头集。但是,这样我就不能将我的定制MessageConverterSimpleKafkaHeaderMappermapAllStringsOut一起使用,这会阻止另一个魔术头spring_json_header_types
我的总体目标是避免Spring Kafka设置任何魔术头,如__TypeId__spring_json_header_types

sr4lhrrt

sr4lhrrt1#

这些头确实是从该转换器填充的。它委托给typeMapper

this.typeMapper.fromClass(message.getPayload().getClass(), headers);

其中,DefaultJackson2JavaTypeMapper在上述__TypeId__报头周围添加了一个逻辑:

public void fromJavaType(JavaType javaType, Headers headers) {
    String classIdFieldName = getClassIdFieldName();
    if (headers.lastHeader(classIdFieldName) != null) {
        removeHeaders(headers);
    }

    addHeader(headers, classIdFieldName, javaType.getRawClass());

默认情况下,getClassIdFieldName()AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME
因此,使用您当前的ByteArraySerializerByteArrayJsonMessageConverter组合,没有办法防止该头文件出现在ProducerRecord中。除非您注入一个自定义的Jackson2JavaTypeMapper,其中覆盖了fromJavaType()和空的body。
你真的可以用DefaultKafkaHeaderMapper和它的setMapAllStringsOut(true)来避免它,如果你要发送的所有头都是String或已经是byte[],就不会有spring_json_header_types额外的头。
请参阅此ctor DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns),并仅提供您真正需要发送的那些头。

4ktjp1zp

4ktjp1zp2#

正如@artem-bilan所解释的那样,在使用ByteArraySerializerByteArrayJsonMessageConverter时,要摆脱__TypeId__并不容易。所以,我使用了JsonSerializerMessagingMessageConverter。下面是我的配置,它产生了没有任何魔法头的Kafka事件。

@TestConfiguration
public static class TestKafkaProducerConfig {

    @Bean
    public KafkaTemplate<String, MyEvent> kafkaTemplate(
            @Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {

        var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
        template.setDefaultTopic(topicName);

        // Use SimpleKafkaHeaderMapper which does not add the json types header, `spring_json_header_types`.
        // The mapAllStringsOut when set to true, all string-valued headers will be converted to byte[] using
        // the charset property (default UTF-8).
        var headerMapper = new SimpleKafkaHeaderMapper();
        headerMapper.setMapAllStringsOut(true);

        var messageConverter = new MessagingMessageConverter();
        messageConverter.setHeaderMapper(headerMapper);

        template.setMessageConverter(messageConverter);
        return template;
    }

    @Bean
    public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
        return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
    }

    public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        return props;
    }
}

相关问题