我现在陷入了一个恼人的境地。我正在尝试使用spark流与kafka和mongodb一起实现kafka偏移处理逻辑。与mongodb之间的偏移持久性执行了它应该执行的操作,但是当我尝试使用以下方法创建直接流时:
JavaInputDStream<ConsumerRecord<String, String>> events = KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(committedOffsetRanges.keySet(),kafkaParams, committedOffsetRanges)
);
我得到以下例外情况(为了简洁起见删除了很多行):
at org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign(ConsumerStrategy.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
我使用以下依赖项:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10-assembly_2.10</artifactId>
<version>2.2.0</version>
</dependency>
${spark version}设置为“1.6.0-cdh5.12.1”。我读过 org.apache.spark.internal.Logging
存在到v 1.5.2的Spark,但不幸的是,我不能降级。嗯。。。有没有人找到了解决这个问题的办法,或者至少找到了解决办法?
暂无答案!
目前还没有任何答案,快来回答吧!