使用SpringKafka模板时如何指定类类型?

bogh5gae  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(390)

我当前的设置具有以下配置:

  1. @Bean
  2. public ConcurrentKafkaListenerContainerFactory<String, String> myKafkaListenerContainerFactory(
  3. ConsumerFactory<String, String> consumerFactory) {
  4. ConcurrentKafkaListenerContainerFactory<String, String> factory = new
  5. ConcurrentKafkaListenerContainerFactory<>();
  6. factory.setConsumerFactory(consumerFactory);
  7. factory.setMessageConverter(stringJsonMessageConverter());
  8. return factory;
  9. }

其中stringjsonmessageconverter

  1. @Bean
  2. public StringJsonMessageConverter stringJsonMessageConverter() {
  3. return new StringJsonMessageConverter(objectMapper());
  4. }

使用我的对象Map器

  1. @Bean
  2. public ObjectMapper objectMapper() {
  3. return new ObjectMapper()
  4. .registerModule(new JavaTimeModule())
  5. .registerModule(myCustomJacksonModules())
  6. .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
  7. .configure(ACCEPT_SINGLE_VALUE_AS_ARRAY, true)
  8. .configure(WRITE_DATES_AS_TIMESTAMPS, false);
  9. }

通过此配置,我可以发布为:

  1. ...
  2. headers = new MessageHeaders(singletonMap(TOPIC, topic));
  3. Foo foo = ....
  4. Message<?> message = new GenericMessage<>(foo, headers);
  5. kafkaTemplate.send(message);

消费为:

  1. @KafkaListener(topics = "myTopic",
  2. groupId = "g1",
  3. containerFactory = "myKafkaListenerContainerFactory")
  4. public void onMessageReceived(Foo foo) {
  5. ... works with foo here
  6. }

在本例中,如果foo是具体的类,那么这个方法很好用。然而,如果foo是一个抽象,而作为消息发送的是它的子类

  1. F1 extends Foo; F2 extends Foo ...
  2. ... // publisher
  3. headers = new MessageHeaders(singletonMap(TOPIC, topic));
  4. F1 f1 = ....
  5. Message<?> message = new GenericMessage<>(f1, headers);
  6. kafkaTemplate.send(message);
  7. ... // Listener
  8. @KafkaListener(topics = "myTopic",
  9. groupId = "g1",
  10. containerFactory = "myKafkaListenerContainerFactory")
  11. public void onMessageReceived(Foo foo) {
  12. ... wont work
  13. }

在我的使用者中将foo声明为type将失败,但这会起作用:

  1. @KafkaListener(topics = "myTopic",
  2. groupId = "g1",
  3. containerFactory = "myKafkaListenerContainerFactory")
  4. public void onMessageReceived(Map<String,Object> fooAsMap) {
  5. ... this works too
  6. }

除了上面提到的使用Map的选项外,我还尝试了以下方法:

  1. - class level kafka listener ([from here)][1]; it still requires method with param of type Map
  2. - custom deserializer registered in object mapper?

有没有办法在我发送消息时指定目标类型?

cuxqih21

cuxqih211#

配置 StringJsonMessageConverter 有一个习惯 DefaultJackson2JavaTypeMapper 那里有 setTypePrecedence(TypePrecedence.TYPE_ID) .
还可以根据发送的type id头设置Map到所需类的类型Map(仅当生产者的类与使用者的类不同时才需要)。

相关问题