反序列化pojo而不使用类型信息

ecbunoof  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(344)

我正在开发一个分布式微服务应用程序,它使用kafka进行内部通信。应用程序在主题上交换pojo。当生产者向使用者发送消息时,默认情况下会添加一个头,指示有效负载中对象的包名和类名。然后,使用者应用程序使用此信息反序列化有效负载。但这要求我在两个应用程序的同一个包中定义完全相同的类,这并不能为我带来一个好的设计。如果我将生产者端的配置(jsonserializer.add\u type\u info\u headers)设置为不发送类型输入头,则会导致使用者端出错。另外,我不想在使用者应用程序上使用默认类型,因为它有多个侦听器,它们需要不同类型的对象。为什么kafkalistener不能简单地将json负载反序列化为参数中给定的对象类型,为什么它需要头呢?
为了解决这个问题,我在消费者应用程序上定义了一个带有“bytesdeserialser”的consumerfactory和一个带有“bytesjsonmessageconverter”的kafkalistenercontainerfactory。有了这个,它在消费者端起作用了,但是我不知道如何在生产者端起作用,同时使用replyingkafkatemplate和反序列化消费者的回复。
下面是我的配置-//生产者配置

@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
  return props;
}

@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Object> replyTemplate() {
  return new KafkaTemplate<>(replyProducerFactory());
}

//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
    ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
    new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
    replyingKafkaTemplate.setReplyTimeout(10000);
    replyingKafkaTemplate.setMessageConverter(converter());
    return replyingKafkaTemplate;
}

@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}

@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
    JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
    return props;
}
b4wnujal

b4wnujal1#

可以使用类型Map。
制作人Map com.acme.Foofoo 以及消费者Map foocom.other.Bar .
这些类型必须在json级别兼容。
如果只接收一个类型,则可以将反序列化程序配置为使用该类型,而不是查找包含类型信息的标头。
https://docs.spring.io/spring-kafka/docs/2.5.2.release/reference/html/#serdes-json配置 JsonDeserializer.KEY_DEFAULT_TYPE :用于在不存在标头信息时反序列化键的回退类型。 JsonDeserializer.VALUE_DEFAULT_TYPE :用于在不存在标头信息时反序列化值的回退类型。
从版本2.5开始,您可以添加一个反序列化程序将调用的函数,以便您可以内省数据以确定类型。
请参见使用方法确定类型。
这(和类型Map)是处理回复模板中多个类型的唯一方法。在使用者方面,我们可以根据方法参数推断类型(这是在那里使用的正确机制-它不是“变通方法”)。

相关问题