structuredstreamingkafka2.1->zeppelin0.8->spark2.4:spark不使用jar

bxgwgixi  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我有一个kafka2.1消息代理,希望对spark2.4中的消息数据进行一些处理。我想使用齐柏林飞艇0.8.1笔记本进行快速原型制作。
我下载了spark-streaming-kafka-0-10_.11.jar,这是结构化流媒体所必需的(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)并将其作为“依赖工件”添加到齐柏林飞艇的“spark”解释器中(该解释器也处理%pyspark段落)。我重新启动了这个解释器(还有齐柏林飞艇)。
我还将jar加载到笔记本的第一段中(我首先认为这不必要……):

  1. %dep z.load("/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar")
  2. res0: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2b65d5

所以,我没有出错,所以加载似乎正常。现在,我要做测试,kafka服务器使用这个端口在同一台机器上运行,还有一个主题“测试”:

  1. %pyspark
  2. # Subscribe to a topic
  3. df = spark \
  4. .readStream \
  5. .format("kafka") \
  6. .option("kafka.bootstrap.servers", "localhost:9092") \
  7. .option("subscribe", "test") \
  8. .load()

但我得到了错误
无法执行第6行:.option(“subscribe”,“test”)\traceback(最近一次调用):file“/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第63行,deco return f(*a,**kw)file“/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,第328行,格式为get\u return value(target\u id,“.”,name),value)py4j.protocol.py4jjavaerror:调用o120.load时出错:org.apache.spark.sql.analysisexception:找不到数据源:kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdateasource(datasource。scala:652)在org.apache.spark.sql.streaming.datastreamreader.load(datastreamreader。scala:161)在sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)在sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl)。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)在py4j.reflection.methodinvoker.invoke(methodinvoker。java:244)在py4j.reflection.reflectionengine.invoke(reflectionengine。java:357)在py4j.gateway.invoke(gateway。java:282)在py4j.commands.abstractcommand.invokemethod(abstractcommand。java:132)在py4j.commands.callcommand.execute(callcommand。java:79)在py4j.gatewayconnection.run(网关连接。java:238)在java.lang.thread.run(线程。java:748)
在处理上述异常时,发生了另一个异常:
回溯(最后一次调用):文件“/tmp/zeppelin\u pyspark-312826888257172599.py”,第380行,在exec(code,\zcuserquerynamespace)文件“,”第6行,在文件“/usr/local/analyze/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py”第400行,在load return self.\u df(self.\u jreader.load())file“/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/java\u gateway.py”中,第1257行,在call answer、self.gateway\u client、self.target\u id、self.name)file“/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”中,第69行,在deco中引发analysisexception(s.split(,stacktrace)pyspark.sql.utils.analysisexception:'找不到数据源:kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序
我想知道,至少有一个调整(解释器配置或直接加载)应该有效。
我还在控制台上尝试了spark submit--jar/usr/local/analyze/jar/spark-streaming-kafka-0-10_.11.jar,但这似乎只有在提交程序时才起作用。
因此,我还将spark-streaming-kafka-0-10_.11.jar复制到/usr/local/analyze/spark/jars/spark的所有其他jar都在这里。但在重启(spark和齐柏林飞艇)之后,我总是会遇到同样的错误。
同时我发现我可以在webbrowser中查看spark的环境变量,在那里我在“classpath entries”部分找到spark-streaming-kafka-0-10_.11.jar,源代码是“system classpath”,也可以是“added by user”(似乎是齐柏林飞艇解释器部分的工件)。看来我的前两次尝试应该奏效了。

cyej8jka

cyej8jka1#

第一个问题是您已经下载了spark streaming的包,但是尝试创建一个结构化的streaming对象(使用 readstream() ). 请记住,spark流和spark结构化流是两个不同的东西,需要区别对待。
对于结构化流媒体,您需要下载spark-sql-kafka-0-10_.11包及其依赖项kafka clients、slf4j api、snappy java、lz4 java和unused。依赖项部分应如下所示以加载所有必需的包:

  1. z.load("/tmp/spark-sql-kafka-0-10_2.11-2.4.0.jar")
  2. z.load("/tmp/kafka-clients-2.0.0.jar")
  3. z.load("/tmp/lz4-java-1.4.0.jar")
  4. z.load("/tmp/snappy-java-1.1.7.1.jar")
  5. z.load("/tmp/unused-1.0.0.jar")
  6. z.load("/tmp/slf4j-api-1.7.16.jar")

相关问题