我有一些问题,而试图阅读Kafka与Spark流。
我的代码是:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "consumergroup",
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connection.timeout.ms" -> "10000"
//"kafka.auto.offset.reset" -> "smallest"
)
val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
我以前在端口2181启动zookeeper,在端口9092启动kafka服务器0.9.0.0。但是我在spark驱动程序中得到了以下错误:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
Zookeeper日志:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
有什么提示吗?
非常感谢你
2条答案
按热度按时间2eafrhcq1#
问题与Kafka版本错误有关。
如文件所述
Kafka:spark streaming 1.5.2与Kafka0.8.2.1兼容
所以,包括
在我的pom.xml中(而不是版本0.9.0.0)解决了这个问题。
希望这有帮助
uajslkp62#
kafka10流媒体/spark 2.1.0/dcos/中间层
我花了一整天的时间在这上面,这篇文章一定读了十几遍。我试过spark 2.0.0,2.0.1,Kafka8,Kafka10。远离Kafka8和spark 2.0.x,依赖就是一切。从下面开始。它起作用了。
sbt公司:
工作Kafka/Spark流代码:
请喜欢,如果你看到这个,我可以得到一些声誉点。:)