为什么spark submit无法找到kafka数据源,除非使用了--packages?

yyhrrdl8  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(577)

我正在尝试将Kafka集成到我的spark应用程序中,以下是我的pom文件必需的条目:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.stream.kafka.version}</version>
</dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>${kafka.version}</version>
</dependency>

相应的工件版本为:

<kafka.version>0.10.2.0</kafka.version>
<spark.stream.kafka.version>2.2.0</spark.stream.kafka.version>

我一直在挠头:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html

我还试着给jar提供 --jars 参数,但是它没有帮助。我错过了什么?
代码:

private static void startKafkaConsumerStream() {

        Dataset<HttpPackage> ds1 = _spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
                .option("subscribe", HTTP_FED_VO_TOPIC)
                .load() // Getting the error here
                .as(Encoders.bean(HttpPackage.class));

        ds1.foreach((ForeachFunction<HttpPackage>)  req ->System.out.print(req));

    }

Spark定义为:

_spark = SparkSession
                .builder()
                .appName(_properties.getProperty("app.name"))
                .config("spark.master", _properties.getProperty("master"))
                .config("spark.es.nodes", _properties.getProperty("es.hosts"))
                .config("spark.es.port", _properties.getProperty("es.port"))
                .config("spark.es.index.auto.create", "true")
                .config("es.net.http.auth.user", _properties.getProperty("es.net.http.auth.user"))
                .config("es.net.http.auth.pass", _properties.getProperty("es.net.http.auth.pass"))
                .getOrCreate();

我的进口是:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;

但是,当我运行我的代码时,如这里所述,这是与package选项一起运行的:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

它起作用了

qlvxas9a

qlvxas9a1#

spark structured streaming支持使用外部kafka-0-10-sql模块将apache kafka作为流源和汇。 kafka-0-10-sql 模块不可用于激发使用提交以执行的应用程序 spark-submit . 该模块是外部的,要使其可用,您应该将其定义为依赖项。
除非你使用 kafka-0-10-sql spark应用程序中特定于模块的代码您不必将模块定义为 dependencypom.xml . 因为没有代码使用模块的代码,所以不需要对模块进行编译依赖。您可以根据接口编写代码,这也是sparksql如此易于使用的原因之一(也就是说,只需要很少的代码就可以拥有相当复杂的分布式应用程序)。 spark-submit 但是需要 --packages 您报告的命令行选项运行良好。
但是,当我运行我的代码时,如这里所述,这是与package选项一起运行的:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

它能正常工作的原因 --packages 你必须告诉spark infrastructure在哪里可以找到 kafka 格式。
这将我们引向另一个“问题”(或需求),即使用kafka运行流式spark应用程序。必须指定对的运行时依赖关系 spark-sql-kafka 模块。
使用指定运行时依赖项 --packages 命令行选项(该选项在您之后下载必要的jar) spark-submit 或者创建一个所谓的uberjar(或者一个胖jar)。
那就是 pom.xml 来玩(这就是为什么人们在游戏中提供帮助 pom.xml 模块作为一个 dependency ).
因此,首先,您必须指定 pom.xml .

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

最后但并非最不重要的一点是,您必须构建一个在中配置的uberjar pom.xml 使用ApacheMavenShade插件。
使用apachemaven shade插件,您可以创建一个uberjar,其中包含所有的“基础设施” kafka 格式化工作,在spark应用程序jar文件中。事实上,uberjar将包含所有必需的运行时依赖项,因此您可以 spark-submit 只有jar(没有 --packages 选项或类似选项)。

bnlyeluc

bnlyeluc2#

将下面的依赖项添加到 pom.xml 文件。

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
</dependency>
qzlgjiam

qzlgjiam3#

更新依赖项和版本。下面给出的依赖项应该可以正常工作:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

注意:在前两个依赖项中提供了范围。

相关问题