我用spark从Kafka的一个主题中获取数据。我必须向Kafka夫罗德里亚泽提供数据。我将Kafka消费者配置为:
kafkaParams.put("bootstrap.servers", "10.0.4.215:9092");
kafkaParams.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
kafkaParams.put("value.deserializer",io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
// kafkaParams.put("key.convert", com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.class);
//kafkaParams.put("value.convert",com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
但是当我执行代码时,线程中有一个异常
“streaming start”java.lang.noclassdeffounderror:io/confluent/common/config/configexception
有人能告诉我在哪里能找到这个班级吗?例如maven dependency ext。
2条答案
按热度按时间nhaq1z211#
您需要以下依赖项:group:'io.confluent',name:'common config',version:yourconfluentversion
yqyhoc1h2#
我也有同样的问题。我曾经
5.1.0
合流平台版本。我检查了兼容性kafka <-> confluent
并发现有更新的版本在同一级别的兼容性。我把版本更新到5.1.1
它为我解决了这个问题。例如: