spark 1.6.1与kafka 0.8.2.1的集成

atmip9wb  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(396)

我正在尝试集成spark 1.6.1和kafka 2.10-0.8.2.1/kafka 2.10-0.9.0.1。Kafka2.10-0.9.0.1
对于如下所示的directstream,它失败了

val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      Map("group.id" -> "group",
        "auto.offset.reset" -> "smallest",
        "metadata.broker.list" -> "127.0.0.1:9092",
        "bootstrap.servers"-> "127.0.0.1:9092"),
      Set("tweets")
      )

将异常获取为

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
        at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
        at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)

我浏览了链接“kafka.cluster.brokerendpoint cannot be cast to kafka.cluster.broker”,其中提到kafka 0.9与spark 1.6.1不兼容,并建议我们使用kafka 0.8.2.1,但我们仍然面临同样的问题。
环境:scala-2.10.3,spark-1.6.1,kafka(0.8/0.9)

Library dependency
        "org.apache.spark" % "spark-core_2.10" % "1.6.1",
        "org.apache.spark" % "spark-sql_2.10" % "1.6.1",
        "org.apache.spark" % "spark-streaming_2.10" % "1.6.1",
        "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1",
        "org.apache.kafka" %% "kafka" % "0.8.0.1"  

    Please let me know if find anything inappropriate, Thanks in advance.
3bygqnnd

3bygqnnd1#

I have used IO confluent which is wrapper on Kafka to resolve the issue. Confluent provides simple API and extended features to support avro cleanly. It provides Schema Registry to store multiple versions of Avro and  no need to pass avro schema from kafka producer to kafka consumer,it is handled by Confluent.
    For more clarification and features please visit https://www.confluent.io/ 
    I have used confluent 2 which is available at https://www.confluent.io/download/

Library Dependency
          libraryDependencies ++= Seq(
   "io.confluent" % "kafka-avro-serializer" % "2.0.0",
   "org.apache.spark" % "spark-streaming_2.11" % "1.6.1" % "provided".
  )
 resolvers ++= Seq(
  Resolver.sonatypeRepo("public"),
  "Confluent Maven Repo" at "http://packages.confluent.io/maven/"
)

    Code sample
     val dStream:InputDStream[ConsumerRecord[String, GenericRecord]] = 
       KafkaUtils.createDirectStream[String, GenericRecord](
       streamingContext, PreferConsistent, Subscribe[String, GenericRecord](topics,   
       kafkaParams))

    You can iterate over dStream and do business logic.

相关问题