执行我的消费者时出现下一个错误:
2021-01-25 17:59:36.120 ERROR 1147 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) [spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) [spring-kafka-2.6.5.jar!/:2.6.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_201]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_201]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_201]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition TestTopic at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 12, 0, 0, 16, 49, 50, 51, 52, 53, 54, 55, 56, 0, 8, 83, 66, 73, 70, 0, 2, 68, 0, -102, -103, -103, -103, -103, -103, -71, 63, 0, 20, 50, 48, 50, 48, 45, 48, 49, 45, 50, 48, 0, 20, 50, 48, 50, 48, 45, 48, 51, 45, 48, 49, 0, 20, 50, 48, 50, 48, 45, 48, 49, 45, 50, 48, 0, 2, 53]] from topic [TestTopic]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0xbff0010 (above 0x0010ffff) at char #1, byte #7)
我用avros文件创建了Topic_TEST类。
我的消费者配置:
@Autowired
PropertyConfig propertyConfig;
private final static String TRUSTSTORE_JKS = "truststore.jks";
private final static String SASL_PROTOCOL = "SASL_SSL";
private final static String SCRAM_SHA_256 = "SCRAM-SHA-256";
private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
private final String consJaasCfg = String.format(jaasTemplate, "test", "test123");
private static final String TRUSTED_PACKAGE = "com.consumer.test.domain";
@Bean
public ConsumerFactory<String, Topic_Test>
DtoConsumerTest()
{
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propertyConfig.getBootstrapServer());
props.put(ConsumerConfig.GROUP_ID_CONFIG, propertyConfig.getGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
if(propertyConfig.getFlag())
{
props.put(JsonDeserializer.TRUSTED_PACKAGES, TRUSTED_PACKAGE);
props.put("sasl.mechanism", SCRAM_SHA_256);
props.put("sasl.jaas.config", consJaasCfg);
props.put("security.protocol", SASL_PROTOCOL);
props.put("ssl.truststore.location", TRUSTSTORE_JKS);
props.put("ssl.truststore.password", propertyConfig.getPasswordTrustore());
props.put("ssl.endpoint.identification.algorithm", "");
props.put("schema.registry.url", "127.0.0.1:9092");
}
return new DefaultKafkaConsumerFactory<>(
props, new StringDeserializer(),
new JsonDeserializer<>(Topic_Test.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,
Topic_Test>
TopicTestListener()
{
ConcurrentKafkaListenerContainerFactory<String,
Topic_Test>
factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(DtoConsumerTest());
return factory;
}
我发送消息与生产者与下一个配置:
Properties properties = new Properties();
// normal producer
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "TEST-GROUP");
properties.put("sasl.mechanism", SCRAM_SHA_256);
properties.put("sasl.jaas.config", consJaasCfg);
properties.put("security.protocol", SASL_PROTOCOL);
properties.put("ssl.truststore.location", TRUSTSTORE_JKS);
properties.put("ssl.truststore.password", "test");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put(ProducerConfig.RETRIES_CONFIG, 2); //increase to 10 from default of 0
// avro part
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://127.0.0.1:9092");
Producer<String, Topic_TEST> producer = new KafkaProducer<String, Topic_TEST>(properties);
我不知道到底是什么问题,我使用相同的键和值为Kafka配置。我需要一个有效的架构注册表?
编辑:我使用了错误处理逻辑(ErrorHandlingValidator):
2021-01-26 17:15:56.344 ERROR 11876 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
java.lang.ArrayIndexOutOfBoundsException: 1
at com.test.config.ConsumerConfigTest$1.handle(ConsumerConfigTest.java:109) ~[classes!/:0.0.1-SNAPSHOT]
at com.test.config.ConsumerConfigTest$1.handle(ConsumerConfigTest.java:86) ~[classes!/:0.0.1-SNAPSHOT]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2102) [spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1997) [spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1924) [spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) [spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.6.5.jar!/:2.6.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) [spring-kafka-2.6.5.jar!/:2.6.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_201]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_201]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_201]
2条答案
按热度按时间epggiuax1#
您似乎正在使用
JsonDeserializer
和KafkaAvroSerializer
(后者不是有效的消费者配置值)它们是不相容的;你的消费者格式需要匹配你的生产者格式。
所以试试这个
定影后
fykwrbwg2#
您可以将
ErrorHandlingDeserializer
添加到ConsumerFactory
bean。另外,如错误堆栈跟踪中所述,您应该考虑添加一个自定义的错误处理器,该错误处理器将在处理错误后定义 * 错误处理 * 逻辑和 * 查找逻辑**。
在
ConcurrentKafkaListenerContainerFactory
中使用此自定义错误处理程序,如下所示: