sql-kafka-0-10_2.11:2.1.0依赖关系在maven pom文件中工作?

jgzswidk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(378)

这个问题之前已经在这里讨论过了,但是在撰写本文的时候,我没有足够的声誉对algomeisters解决方案发表评论(它最终不适合我)
我有一个Spark工作,使用Kafka和结构化流媒体。因此它要求我对 spark-sql-kafka-0-10 模块。
jaceklaskowski说过,您必须在spark submit命令行选项中包含这个包
对kafka的结构化流式支持位于单独的spark-sql-kafka-0-10模块(aka library dependency)中。
默认情况下不包括spark-sql-kafka-0-10模块,因此您必须使用--packages命令行选项启动spark submit(以及类似spark shell的“派生工具”来“安装”它。
这是我做的,下面是我的Spark提交

SPARK_KAFKA_VERSION=0.10 spark2-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 \
--class MyApp.MainClassPath \
--master local[4] \
MySparkApp-0.0.1-jar-with-dependencies.jar

但是,我不认为这是一个好的选择,每次我运行这个jar它都必须重新下载依赖项。如果由于某种原因,此依赖项不可用,我的应用程序将不再运行。我使用maven作为包管理器,我在pom文件中有这个包,但是它不工作。

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
    <scope>runtime</scope>
</dependency>

当我尝试运行spark作业并删除“--packages”选项时,会出现以下错误

Exception in thread "main" java.lang.ClassNotFoundException: 
Failed to find data source: kafka. Please find packages at 
http://spark.apache.org/third-party-projects.html

这是由

Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource

因此,我尝试使用maven shade插件包含algomeister提供的解决方案,但是它不起作用。我得到下面的错误!

Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
        at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:330)
        at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:263)
        at java.util.jar.JarVerifier.processEntry(JarVerifier.java:318)
        at java.util.jar.JarVerifier.update(JarVerifier.java:230)
        at java.util.jar.JarFile.initializeVerifier(JarFile.java:383)
        at java.util.jar.JarFile.getInputStream(JarFile.java:450)
        at sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:977)
        at sun.misc.Resource.cachedInputStream(Resource.java:77)
        at sun.misc.Resource.getByteBuffer(Resource.java:160)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:454)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:695)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

如果我们能找到一个可行的解决方案那就太好了。谢谢您。

v1uwarro

v1uwarro1#

在maven文件下面使用shade插件。示例类来自 spark-2.2.0 示例-javastructuredkafcawordcount。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>structured-kafka-word-count</groupId>
<artifactId>structured-kafka-word-count</artifactId>
<version>1.0.0</version>
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                 <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

你可以用 spark-submit 命令如下。

spark-submit --class org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount --master local[2] structured-kafka-word-count-1.0.0.jar localhost:9092 subscribe mytopic

相关问题