我对spark streaming 2.1.0、scala 2.11.8和kafka 0.9.0有问题。这些是我的依赖项:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.1.0",
"org.apache.kafka" % "kafka-clients" % "0.9.0.0",
"org.apache.kafka" %% "kafka" % "0.9.0.0"
),
这是我连接Kafka服务器的spark流代码:
def initializeKafka(ssc: StreamingContext, topic: String):
InputDStream[(String,String)] = {
val lioncubConf = Configuration
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "debian:9092",
"group.id" -> "222",
"auto.offset.reset" -> "smallest",
"enable.auto.commit" -> "false")
KafkaUtils.createDirectStream
[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
}
现在我正在跑步 kafka_2.11-0.9.0.0
在debian虚拟机上,我在我的 /etc/hosts
. 只有一个经纪人。这个主题是在Kafka快速入门之后创建的(https://kafka.apache.org/090/documentation/#quickstart). 我必须使用这个版本的kafka,因为它是集群上我必须部署软件的版本。
我面临以下错误:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97)
at scala.Option.map(Option.scala:146)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:94)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:93)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:93)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:92)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:92)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:186)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:168)
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:162)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at hfu.lioncub.KafkaInitializer$.initializeKafka(KafkaInitializer.scala:22)
at hfu.lioncub.LioncubStream$.main(LioncubStream.scala:44)
at hfu.lioncub.LioncubStream.main(LioncubStream.scala)
似乎是版本冲突。但我想不通。知道我做错了什么吗?
1条答案
按热度按时间toiithl61#
我把Kafka的版本改成了
org.apache.kafka" % "kafka-clients" % "0.8.2.1
并被移除"org.apache.kafka" %% "kafka" % "0.9.0.0"
. 然后我不得不解决hdfs的配置无法解决的问题,我还通过添加mergestrategy来解决这个问题:case PathList("META-INF", "services", "org.apache.hadoop.fs.FileSystem") => MergeStrategy.filterDistinctLines
.