class org.apache.kafka.common.serialization.stringserializer

fdbelqdn  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(533)

我写了一个Kafka生产者,运行良好,当我在任何地方执行它除了 Spring xd守护程序。

private Properties setProperties () {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.getBrokerList());
    props.put(ProducerConfig.ACKS_CONFIG,this.getAcks());
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, this.getBatchSize());
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.getCompressionCodec());
    props.put(ProducerConfig.RETRIES_CONFIG,this.getRetries());
    props.put(ProducerConfig.LINGER_MS_CONFIG,this.getLinger());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,this.getKeySerializer());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,this.getValueSerializer());
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,this.getBufferMemoryConfig());
    return props;
}

当我尝试将spring xd作为作业运行时,出现以下错误:

2017-08-15T21:40:05+0000 1.2.1.RELEASE ERROR pool-11-thread-8 consumer.RabbitMQSource - foo-rabbitmq-kafka
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) ~[kafka-clients-0.11.0.0.jar!/:na]
    at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) ~[kafka-clients-0.11.0.0.jar!/:na]
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) ~[kafka-clients-0.11.0.0.jar!/:na]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) ~[kafka-clients-0.11.0.0.jar!/:na]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) ~[kafka-clients-0.11.0.0.jar!/:na]
    at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:359) ~[kafka-clients-0.11.0.0.jar!/:na]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287) ~[kafka-clients-0.11.0.0.jar!/:na]
    at com.foo.dsat.producer.KafkaSink.getProducer(KafkaSink.java:54) ~[foo-rabbit.jar!/:na]
    at com.foo.dsat.producer.KafkaSink.sendMessage(KafkaSink.java:62) ~[foo-rabbit.jar!/:na]
    at com.foo.dsat.consumer.RabbitMQSource$2.handleDelivery(RabbitMQSource.java:90) ~[foo-rabbit.jar!/:na]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-4.2.0.jar!/:na]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [amqp-client-4.2.0.jar!/:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
2017-08-15T21:40:05+0000 1.2.1.RELEASE ERROR pool-11-thread-8 consumer.RabbitMQSource - foo-rabbitmq-kafka
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.

我看到了这个问题的解决方案,在另一个主题中,

Thread.currentThread().setContextClassLoader(null);

但当我尝试时,我得到了另一个奇怪的错误:

Could not load 'org-xerial-snappy.properties' from classpath:
java.lang.NullPointerException

我在google上发现了这个错误,但与Kafka的旧版本有关,这不是我的。

compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '0.11.0.0'

有什么建议吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题