为什么我不能在消费者工厂中设置反序列化程序?

zlhcx6iw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(296)

所有人。

package kitchen;

import entity.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
public class PizzaKitchenApplication {
    public static void main(String[] args) {
        SpringApplication.run(PizzaKitchenApplication.class, args);
    }

    @Bean
    public ConsumerFactory<String, Order> consumerFactory(){
        Map<String, Object> configs = new HashMap<>();

        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "com.pizzapool");

        return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(Order.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> orderConcurrentKafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, Order> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();

        containerFactory.setConsumerFactory(consumerFactory());
        containerFactory.setMessageConverter(new JsonMessageConverter());

        return containerFactory;
    }
}

这是我的消费者配置。当我运行在控制台中看到的应用程序时:

value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

但我将值反序列化器设置为jsondeserializer。我试着设置槽配置图。在这两种情况下都不起作用。当我收到信息时,我已经:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void kitchen.service.KafkaOrderMessagingService.handle(entity.Order) throws javax.jms.JMSException]
Bean [kitchen.service.KafkaOrderMessagingService@13183edf]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [entity.Order] for GenericMessage
i7uaboj4

i7uaboj41#

如文件所述(https://docs.spring.io/spring-kafka/reference/html/#record-侦听器):
此机制需要@configuration类之一上的@enablekafka注解和侦听器容器工厂,该工厂用于配置底层concurrentmessagelistenercontainer。默认情况下,需要名为kafkalistenercontainerfactory的bean。
你的豆子被命名为 orderConcurrentKafkaListenerContainerFactory .
您可以使用此默认名称,或者在侦听器中指定自定义bean的名称

@KafkaListener(topics = "test", containerFactory = "orderConcurrentKafkaListenerContainerFactory")

这个 containerFactory 当您需要多个不同的工厂时也很有用。

相关问题