spring kafka密钥序列化程序不适用于对象

hxzsmxv2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(300)

我不能复制文档或样本代码,以便有一个非 String 正在序列化的密钥。
我的目标是使用键(字段)将控制操作与数据一起传递。
班级 ControlChannel 以及 SchedulerEntry 是普通的pojo。
环境是:
java 11
Spring Boot2.4.1
Kafka2.6.0
需要序列化/反序列化的代码:
侦听器和模板

@KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
    public void listenForScheduler(
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control, 
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
        @Payload SchedulerEntry entry) {

        log.info("received data KEY ='{}'", control);
        log.info("received data PAYLOAD = '{}'", entry);

        /* ... */

    }

    @Bean
    public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
        return new KafkaTemplate<>(schedulerProducerFactory());
    }

第一次尝试-使用者和生产者(类型Map和受信任的打包)

@Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.put(JsonSerializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props, 
            new JsonSerializer<ControlChannel>(),
            new JsonSerializer<SchedulerEntry>());
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");

        JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
        k.configure(props, true);

        JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
        k.configure(props, true);

        return new DefaultKafkaConsumerFactory<>(props, k, v);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }

例外情况:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Scheduler-0 at offset 25. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

第二次尝试-使用者和生产者(只需将密钥序列化程序/反序列化程序设置为json)

@Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new JsonDeserializer<>(ControlChannel.class), new JsonDeserializer<>(SchedulerEntry.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }

例外

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: 
Listener method 'public void  io.infolayer.aida.scheduler.KafkaSchedulerListener.listenForScheduler(io.infolayer.aida.ControlChannel,long,io.infolayer.aida.entity.SchedulerEntry)' 
threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题