错误SerializationException - Sping Boot Kafka Consumer

r55awzrz  于 2023-10-15  发布在  Apache
关注(0)|答案(2)|浏览(151)

执行我的消费者时出现下一个错误:

2021-01-25 17:59:36.120 ERROR 1147 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) [spring-kafka-2.6.5.jar!/:2.6.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_201]
        at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_201]
        at java.lang.Thread.run(Unknown Source) [na:1.8.0_201]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition TestTopic at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 12, 0, 0, 16, 49, 50, 51, 52, 53, 54, 55, 56, 0, 8, 83, 66, 73, 70, 0, 2, 68, 0, -102, -103, -103, -103, -103, -103, -71, 63, 0, 20, 50, 48, 50, 48, 45, 48, 49, 45, 50, 48, 0, 20, 50, 48, 50, 48, 45, 48, 51, 45, 48, 49, 0, 20, 50, 48, 50, 48, 45, 48, 49, 45, 50, 48, 0, 2, 53]] from topic [TestTopic]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0xbff0010 (above 0x0010ffff) at char #1, byte #7)

我用avros文件创建了Topic_TEST类。
我的消费者配置:

@Autowired
    PropertyConfig propertyConfig;
    
        
    private  final static String TRUSTSTORE_JKS = "truststore.jks"; 
    private  final static String SASL_PROTOCOL = "SASL_SSL"; 
    private  final static String SCRAM_SHA_256 = "SCRAM-SHA-256"; 
    private  final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; 
    private final String consJaasCfg = String.format(jaasTemplate, "test", "test123"); 

    private static final String TRUSTED_PACKAGE = "com.consumer.test.domain"; 
     
    
    @Bean
    public ConsumerFactory<String, Topic_Test> 
    DtoConsumerTest() 
    { 
          
        final Map<String, Object> props = new HashMap<>(); 
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propertyConfig.getBootstrapServer()); 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propertyConfig.getGroupId()); 

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        
        if(propertyConfig.getFlag())
        {
        props.put(JsonDeserializer.TRUSTED_PACKAGES, TRUSTED_PACKAGE);  
        props.put("sasl.mechanism", SCRAM_SHA_256); 
        props.put("sasl.jaas.config", consJaasCfg); 
        props.put("security.protocol", SASL_PROTOCOL); 
        props.put("ssl.truststore.location", TRUSTSTORE_JKS); 
        props.put("ssl.truststore.password", propertyConfig.getPasswordTrustore()); 
        props.put("ssl.endpoint.identification.algorithm", ""); 

        props.put("schema.registry.url", "127.0.0.1:9092");
        }
        
        return new DefaultKafkaConsumerFactory<>( 
                props, new StringDeserializer(), 
                new JsonDeserializer<>(Topic_Test.class)); 
    } 
  
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, 
    Topic_Test> 
    TopicTestListener() 
    { 
        ConcurrentKafkaListenerContainerFactory<String, 
        Topic_Test> 
            factory 
            = new ConcurrentKafkaListenerContainerFactory<>(); 
        factory.setConsumerFactory(DtoConsumerTest()); 
        return factory; 
    }

我发送消息与生产者与下一个配置:

Properties properties = new Properties();
                // normal producer

                properties.setProperty("bootstrap.servers", "localhost:9092");
                properties.setProperty("acks", "all");
                properties.setProperty("retries", "10");
                properties.put(ProducerConfig.ACKS_CONFIG, "all");
                properties.put(ProducerConfig.CLIENT_ID_CONFIG, "TEST-GROUP");
                
            
                properties.put("sasl.mechanism", SCRAM_SHA_256);
                properties.put("sasl.jaas.config", consJaasCfg);
                properties.put("security.protocol", SASL_PROTOCOL);
                properties.put("ssl.truststore.location", TRUSTSTORE_JKS);
                properties.put("ssl.truststore.password", "test");
                properties.put("ssl.endpoint.identification.algorithm", "");
             
                properties.put(ProducerConfig.RETRIES_CONFIG, 2);  //increase to 10 from default of 0
               
                // avro part
                properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,  StringSerializer.class.getName());
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

                properties.put("schema.registry.url", "http://127.0.0.1:9092");
             

                Producer<String, Topic_TEST> producer = new KafkaProducer<String, Topic_TEST>(properties);

我不知道到底是什么问题,我使用相同的键和值为Kafka配置。我需要一个有效的架构注册表?
编辑:我使用了错误处理逻辑(ErrorHandlingValidator):

2021-01-26 17:15:56.344 ERROR 11876 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

java.lang.ArrayIndexOutOfBoundsException: 1
        at com.test.config.ConsumerConfigTest$1.handle(ConsumerConfigTest.java:109) ~[classes!/:0.0.1-SNAPSHOT]
        at com.test.config.ConsumerConfigTest$1.handle(ConsumerConfigTest.java:86) ~[classes!/:0.0.1-SNAPSHOT]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2102) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1997) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1924) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) [spring-kafka-2.6.5.jar!/:2.6.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_201]
        at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_201]
        at java.lang.Thread.run(Unknown Source) [na:1.8.0_201]
epggiuax

epggiuax1#

您似乎正在使用JsonDeserializerKafkaAvroSerializer(后者不是有效的消费者配置值)
它们是不相容的;你的消费者格式需要匹配你的生产者格式。
所以试试这个

return new DefaultKafkaConsumerFactory<>(props);

定影后

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class.getName());
fykwrbwg

fykwrbwg2#

您可以将ErrorHandlingDeserializer添加到ConsumerFactory bean。

@Bean
    public ConsumerFactory<String, Topic_Test> DtoConsumerTest() 
    { 
        ErrorHandlingDeserializer<Notification> errorHandlingDeserializer;
        errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(Topic_Test.class));  
        ...
        ...
        ...
        return new DefaultKafkaConsumerFactory<>( 
                props, new StringDeserializer(), 
                errorHandlingDeserializer); 
    }

另外,如错误堆栈跟踪中所述,您应该考虑添加一个自定义的错误处理器,该错误处理器将在处理错误后定义 * 错误处理 * 逻辑和 * 查找逻辑**。

public class KafkaErrorHandler implements ContainerAwareErrorHandler {

    /**
     * @param exception
     * @param records
     * @param consumer
     * @param messageListenerContainer
     */
    @Override
    public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer messageListenerContainer) {
        if (!records.isEmpty()) {
            doSeeks(records, consumer);
            Optional<ConsumerRecord<?, ?>> optionalRecord = records.stream().findFirst();
            ConsumerRecord<?, ?> record;
            if (optionalRecord.isPresent()) {
                record = optionalRecord.get();
                String topic = record.topic();
                long offset = record.offset();
                int partition = record.partition();
                if (exception.getClass().equals(DeserializationException.class)) {
                    DeserializationException deserializationException = (DeserializationException) exception;
                    log.error("Malformed Message Deserialization Exception", topic, offset,
                            String.valueOf(deserializationException.getData()),
                            deserializationException.getLocalizedMessage());
                } else {
                    log.error("An Exception has occurred.", topic, offset, partition, exception.getLocalizedMessage());
                }
            }
        } else {
            log.error("An Exception has occurred at Kafka Consumer ", exception.getLocalizedMessage());
        }
    }

    /**
     * Seeks/Checks up to which offset Kafka message was consumed
     * @param records
     * @param consumer
     */
    private static void doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer) {
        Map<TopicPartition, Long> partitions = new LinkedHashMap<>();
        AtomicBoolean first = new AtomicBoolean(true);
        records.forEach((ConsumerRecord<?, ?> record) -> {
            if (first.get()) {
                partitions.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            } else {
                partitions.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
                        offset -> record.offset());
            }
            first.set(false);
        });
        partitions.forEach(consumer::seek);
    }
}

ConcurrentKafkaListenerContainerFactory中使用此自定义错误处理程序,如下所示:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Topic_Test> TopicTestListener() 
    { 
        ...
        ...
        factory.setErrorHandler(new KafkaErrorHandler());
        return factory; 
    }

相关问题