原因:java.lang.classnotfoundexception:kinesis.defaultsource

cedebl8k  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(623)

技术堆栈详细信息-

Scala - 2.11.8
Spark - 2.4.4
Delta - 0.7.0
Running On - AWS EMR

用法-

spark.readStream
      .format("kinesis")
      .option("streamName", kinesisConfs.streamName)
      .option("region", kinesisConfs.kinesisRegion)
      .option("initialPosition", "LATEST")
      .option("maxFetchDuration", kinesisConfs.maxFetchDuration)
      .option("shardsPerTask", kinesisConfs.shardsPerTask)
      .option("fetchBufferSize", kinesisConfs.fetchBufferSize)
      .option("minFetchPeriod", kinesisConfs.minFetchPeriod)
      .option("shardFetchInterval", kinesisConfs.shardFetchInterval)
      .option("maxFetchRate", kinesisConfs.maxFetchRate)
      .load()

pom.xml中的依赖项-

<dependencies>
        <!--  SCALA  -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <!--  SPARK  -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-core_2.12</artifactId>
            <version>0.7.0</version>
            <scope>provided</scope>
        </dependency>
        <!--  JSON  -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <!--
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>com.softwaremill.sttp</groupId>
            <artifactId>core_${scala.binary.version}</artifactId>
            <version>${sttp.version}</version>
        </dependency>
        <dependency>
            <groupId>com.softwaremill.sttp</groupId>
            <artifactId>json4s_${scala.binary.version}</artifactId>
            <version>${sttp.version}</version>
        </dependency>
        <dependency>
            <groupId>org.json4s</groupId>
            <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
            <version>${json4s.version}</version>
        </dependency>
        <dependency>
            <groupId>org.json4s</groupId>
            <artifactId>json4s-ext_${scala.binary.version}</artifactId>
            <version>${json4s.version}</version>
        </dependency>

        <!--  TEST  -->
        <dependency>
            <groupId>org.scalactic</groupId>
            <artifactId>scalactic_${scala.binary.version}</artifactId>
            <version>3.0.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_${scala.binary.version}</artifactId>
            <version>3.0.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scoverage</groupId>
            <artifactId>scalac-scoverage-plugin_${scala.binary.version}</artifactId>
            <version>${scoverage.plugin.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.storm-enroute</groupId>
            <artifactId>scalameter_${scala.binary.version}</artifactId>
            <version>0.17</version>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.mongodb</groupId>
                    <artifactId>casbah_${scala.binary.version}</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah_${scala.binary.version}</artifactId>
            <version>3.1.1</version>
            <type>pom</type>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.awaitility/awaitility-scala -->
        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility-scala</artifactId>
            <version>3.1.6</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>....custom_dependency....</groupId>
            <artifactId>....custom_dependency....</artifactId>
            <version>2.0.0</version>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>com.google.code.gson</groupId>
                    <artifactId>gson</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>....custom_dependency....</groupId>
            <artifactId>....custom_dependency....</artifactId>
            <version>1.4.0</version>
            <scope>test</scope>
        </dependency>

        <!--  OTHERS  -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-text</artifactId>
            <version>1.6</version>
        </dependency>
        <dependency>
            <groupId>org.ocpsoft.prettytime</groupId>
            <artifactId>prettytime</artifactId>
            <version>4.0.1.Final</version>
        </dependency>
        <dependency>
            <groupId>com.github.scopt</groupId>
            <artifactId>scopt_${scala.binary.version}</artifactId>
            <version>3.7.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-kinesis -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-kinesis</artifactId>
            <version>1.11.313</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

错误-

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kinesis. Please find packages at http://spark.apache.org/third-party-projects.html
.
.
.
.
.
.
.
.
.
Caused by: java.lang.ClassNotFoundException: kinesis.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    ... 40 more

使用运行jar-

spark-submit --packages io.delta:delta-core_2.12:0.7.0 /home/hadoop/<jar_name> <some_more_arguments>

以上代码以其当前形式成功地在databricks上运行(我猜这是因为databricks运行时提供了内在的支持)
我到现在为止所做的-
在pom.xml依赖项中,我添加了-

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

在电子病历上查过了-

spark-submit --packages io.delta:delta-core_2.12:0.7.0,org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 /home/hadoop/<Jar_name>.jar  <Some_more_arguments>

但即使这样也会导致同样的错误。我不知道如何解决这个问题。
更新-11/feb/2021
也试过了 spark-streaming-kinesis-asl_2.12:3.0.1 ,但同样的结果。
提前谢谢。

cnh2zyt3

cnh2zyt31#

在添加以下依赖项并将其提供给类路径时。我能够缓解这个问题。
相关性->

<dependency>
            <groupId>com.qubole.spark</groupId>
            <artifactId>spark-sql-kinesis_2.11</artifactId>
            <version>1.2.0_spark-2.4</version>
        </dependency>

如果有更好的解决办法,一定要告诉我。

相关问题