无法使用spring boot构造kafka使用者

0kjbasz6  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(545)

我已经实现了一个简单的使用者应用程序来使用来自主题的消息。当我运行kafka消费者应用程序时,出现了以下错误。
堆栈跟踪

  1. org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  2. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
  3. org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer

配置类

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. private ConsumerFactory<String,String> consumerFactory()
  5. {
  6. Map<String,Object> config=new ConcurrentHashMap<>();
  7. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
  8. config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
  9. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
  10. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
  11. return new DefaultKafkaConsumerFactory<>(config);
  12. }
  13. @Bean
  14. public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
  15. {
  16. ConcurrentKafkaListenerContainerFactory<String,String> factory
  17. =new ConcurrentKafkaListenerContainerFactory<>();
  18. factory.setConsumerFactory(consumerFactory());
  19. return factory;
  20. }
  21. }

侦听器类

  1. @Component
  2. public class KafkaConsumer {
  3. @KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
  4. public void consume(String message)
  5. {
  6. System.out.println("Consumed Message "+message);
  7. }
  8. }

pom.xml文件

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka</artifactId>
  8. <version>2.6.0</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.kafka</groupId>
  12. <artifactId>spring-kafka-test</artifactId>
  13. <version>2.6.0</version>
  14. <scope>test</scope>
  15. </dependency>
  16. <dependency>
  17. <groupId>com.fasterxml.jackson.core</groupId>
  18. <artifactId>jackson-databind</artifactId>
  19. <version>2.11.1</version>
  20. </dependency>

注意-我的Kafka版本是2.13-2.6.0

uhry853o

uhry853o1#

您正在使用 StringSerializer 但应该使用 StringDeserializer ,一个序列化,另一个反序列化。
既然你给他们设定了 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG 以及 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 你显然想反序列化。

  1. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

相关问题