我发现在spark streaming(spark 2.0)中使用Kafka主题有两种方法:
1) 使用 KafkaUtils.createDirectStream
要每k秒获取一次数据流,请参阅此文档
2) 使用 kafka: sqlContext.read.format(“json”).stream(“kafka://KAFKA_HOST”)
为spark 2.0的新特性:结构化流媒体创建一个无限Dataframe,相关文档在这里
方法1)有效,但2)无效,我得到了以下错误
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.DataFrameReader.stream(Ljava/lang/String;)Lorg/apache/spark/sql/Dataset;
...
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我的问题是:
这是什么 “kafka://KAFKA_HOST”
指的是什么?
我该如何解决这个问题?
提前谢谢!
1条答案
按热度按时间4smxwvx51#
spark 2.0还不支持kafka作为无限Dataframe/集的来源。计划在2.1中添加支持
编辑:(2016年12月6日)
Kafka0.10现在已在spark 2.0.2中过期: