spring kafka JSondeserialization messageconversionexception未能解析类名找不到类

3mpgtkmj  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(420)

我有两个服务,应该通过 Kafka . 让我们调用第一个服务writeservice和第二个服务queryservice。
在writeservice方面,我有以下生产者配置。

  1. @Configuration
  2. public class KafkaProducerConfiguration {
  3. @Value("${spring.kafka.bootstrap-servers}")
  4. private String bootstrapServers;
  5. @Bean
  6. public Map<String, Object> producerConfigs() {
  7. Map<String, Object> props = new HashMap<>();
  8. // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
  9. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  10. bootstrapServers);
  11. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  12. StringSerializer.class);
  13. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  14. JsonSerializer.class);
  15. return props;
  16. }
  17. @Bean
  18. public ProducerFactory<String, Object> producerFactory() {
  19. return new DefaultKafkaProducerFactory<>(producerConfigs());
  20. }
  21. @Bean
  22. public KafkaTemplate<String, Object> kafkaTemplate() {
  23. return new KafkaTemplate<>(producerFactory());
  24. }
  25. }

我正试图发送一个类的对象 com.example.project.web.routes.dto.RouteDto 在queryservice端,使用者配置定义如下。

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConsumerConfiguration {
  4. @Value("${spring.kafka.bootstrap-servers}")
  5. private String bootstrapServers;
  6. @Value("${spring.kafka.groupid}")
  7. private String serviceGroupId;
  8. @Value("${spring.kafka.consumer.trusted-packages}")
  9. private String trustedPackage;
  10. @Bean
  11. public Map<String, Object> consumerConfigs() {
  12. Map<String, Object> props = new HashMap<>();
  13. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  14. bootstrapServers);
  15. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  16. StringDeserializer.class);
  17. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  18. JsonDeserializer.class);
  19. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  20. props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
  21. props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
  22. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  23. return props;
  24. }
  25. @Bean
  26. public ConsumerFactory<String, Object> consumerFactory() {
  27. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  28. }
  29. @Bean
  30. public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
  31. ConcurrentKafkaListenerContainerFactory<String, Object> factory =
  32. new ConcurrentKafkaListenerContainerFactory<>();
  33. factory.setConsumerFactory(consumerFactory());
  34. return factory;
  35. }
  36. }

侦听器具有以下定义。负载类具有完全限定名- com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto ```
@KafkaListener(topics = "${spring.kafka.topics.routes}",
containerFactory = "kafkaListenerContainerFactory")
public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
@Payload RouteDto payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
}

  1. private static String typeIdHeader(Headers headers) {
  2. return StreamSupport.stream(headers.spliterator(), false)
  3. .filter(header -> header.key().equals("__TypeId__"))
  4. .findFirst().map(header -> new String(header.value())).orElse("N/A");
  5. }
  1. 当一个消息被发送时,我得到以下错误
  2. 原因:org.springframework.messaging.converter.messageconversionexception:未能解析类名。找不到类[com.example.project.web.routes.dto.routedto];嵌套的异常是java.lang.classnotfoundexception:com.example.project.web.routes.dto.routedto
  3. 这个错误很明显。然而,我不明白为什么它默认有这种行为。我不希望在不同的服务中有相同的套餐,这完全没有意义。
  4. 我还没有找到一种方法来禁用它,并使用提供给侦听器的类,用 `@Payload` 如何解决这个问题,而不必手动配置Map器?
4c8rllxm

4c8rllxm1#

如果您使用的是spring-kafka-2.2.x,那么可以通过重载 JsonDeserializer 文件
从版本2.2开始,您可以显式配置反序列化程序,通过使用一个具有布尔useheadersifpresent的重载构造函数(默认情况下为true),使用提供的目标类型并忽略标头中的类型信息。下面的示例演示如何执行此操作:

  1. DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
  2. new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

如果使用较低版本,则使用 MessageConverter (您可能会在spring-kafka-2.1.x及更高版本中看到此问题)
spring for apache kafka通过messagingmessageconverter实现及其stringjsonmessageconverter和bytesjsonmessageconverter定制提供了messageconverter抽象。可以直接将messageconverter注入到kafkatemplate示例中,也可以使用@kafkalistener.containerfactory()属性的abstractkafkalistenerContainerFactorybean定义。下面的示例演示如何执行此操作:

  1. @Bean
  2. public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
  3. ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
  4. new ConcurrentKafkaListenerContainerFactory<>();
  5. factory.setConsumerFactory(consumerFactory());
  6. factory.setMessageConverter(new StringJsonMessageConverter());
  7. return factory;
  8. }
  9. @KafkaListener(topics = "jsonData",
  10. containerFactory = "kafkaJsonListenerContainerFactory")
  11. public void jsonListener(RouteDto dto) {
  12. ...
  13. }

注意:只有在方法级别声明@kafkalistener注解时,才能实现这种类型推断。对于类级别的@kafkalistener,有效负载类型用于选择要调用的@kafkahandler方法,因此在选择该方法之前,必须已经对其进行了转换。

展开查看全部

相关问题