这个问题之前已经在这里讨论过了,但是在撰写本文的时候,我没有足够的声誉对algomeisters解决方案发表评论(它最终不适合我)
我有一个Spark工作,使用Kafka和结构化流媒体。因此它要求我对 spark-sql-kafka-0-10
模块。
jaceklaskowski说过,您必须在spark submit命令行选项中包含这个包
对kafka的结构化流式支持位于单独的spark-sql-kafka-0-10模块(aka library dependency)中。
默认情况下不包括spark-sql-kafka-0-10模块,因此您必须使用--packages命令行选项启动spark submit(以及类似spark shell的“派生工具”来“安装”它。
这是我做的,下面是我的Spark提交
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 \
--class MyApp.MainClassPath \
--master local[4] \
MySparkApp-0.0.1-jar-with-dependencies.jar
但是,我不认为这是一个好的选择,每次我运行这个jar它都必须重新下载依赖项。如果由于某种原因,此依赖项不可用,我的应用程序将不再运行。我使用maven作为包管理器,我在pom文件中有这个包,但是它不工作。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
<scope>runtime</scope>
</dependency>
当我尝试运行spark作业并删除“--packages”选项时,会出现以下错误
Exception in thread "main" java.lang.ClassNotFoundException:
Failed to find data source: kafka. Please find packages at
http://spark.apache.org/third-party-projects.html
这是由
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
因此,我尝试使用maven shade插件包含algomeister提供的解决方案,但是它不起作用。我得到下面的错误!
Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:330)
at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:263)
at java.util.jar.JarVerifier.processEntry(JarVerifier.java:318)
at java.util.jar.JarVerifier.update(JarVerifier.java:230)
at java.util.jar.JarFile.initializeVerifier(JarFile.java:383)
at java.util.jar.JarFile.getInputStream(JarFile.java:450)
at sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:977)
at sun.misc.Resource.cachedInputStream(Resource.java:77)
at sun.misc.Resource.getByteBuffer(Resource.java:160)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:454)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:695)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
如果我们能找到一个可行的解决方案那就太好了。谢谢您。
1条答案
按热度按时间v1uwarro1#
在maven文件下面使用shade插件。示例类来自
spark-2.2.0
示例-javastructuredkafcawordcount。你可以用
spark-submit
命令如下。