kafka 0.9.0和spark streaming 2.1.0:kafka.cluster.brokerendpoint不能强制转换为kafka.cluster.broker

dsekswqp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(264)

我对spark streaming 2.1.0、scala 2.11.8和kafka 0.9.0有问题。这些是我的依赖项:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "2.1.0",
    "org.apache.spark" %% "spark-streaming" % "2.1.0",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.1.0",
    "org.apache.kafka" % "kafka-clients" % "0.9.0.0",
    "org.apache.kafka" %% "kafka" % "0.9.0.0"
),

这是我连接Kafka服务器的spark流代码:

def initializeKafka(ssc: StreamingContext, topic: String):
   InputDStream[(String,String)] = {
    val lioncubConf = Configuration

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "debian:9092",
      "group.id" -> "222",
      "auto.offset.reset" -> "smallest",
      "enable.auto.commit" -> "false")

    KafkaUtils.createDirectStream
      [String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
}

现在我正在跑步 kafka_2.11-0.9.0.0 在debian虚拟机上,我在我的 /etc/hosts . 只有一个经纪人。这个主题是在Kafka快速入门之后创建的(https://kafka.apache.org/090/documentation/#quickstart). 我必须使用这个版本的kafka,因为它是集群上我必须部署软件的版本。
我面临以下错误:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97)
at scala.Option.map(Option.scala:146)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:94)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:93)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:93)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:92)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:92)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:186)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:168)
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:162)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at hfu.lioncub.KafkaInitializer$.initializeKafka(KafkaInitializer.scala:22)
at hfu.lioncub.LioncubStream$.main(LioncubStream.scala:44)
at hfu.lioncub.LioncubStream.main(LioncubStream.scala)

似乎是版本冲突。但我想不通。知道我做错了什么吗?

toiithl6

toiithl61#

我把Kafka的版本改成了 org.apache.kafka" % "kafka-clients" % "0.8.2.1 并被移除 "org.apache.kafka" %% "kafka" % "0.9.0.0" . 然后我不得不解决hdfs的配置无法解决的问题,我还通过添加mergestrategy来解决这个问题: case PathList("META-INF", "services", "org.apache.hadoop.fs.FileSystem") => MergeStrategy.filterDistinctLines .

相关问题