技术堆栈详细信息-
Scala - 2.11.8
Spark - 2.4.4
Delta - 0.7.0
Running On - AWS EMR
用法-
spark.readStream
.format("kinesis")
.option("streamName", kinesisConfs.streamName)
.option("region", kinesisConfs.kinesisRegion)
.option("initialPosition", "LATEST")
.option("maxFetchDuration", kinesisConfs.maxFetchDuration)
.option("shardsPerTask", kinesisConfs.shardsPerTask)
.option("fetchBufferSize", kinesisConfs.fetchBufferSize)
.option("minFetchPeriod", kinesisConfs.minFetchPeriod)
.option("shardFetchInterval", kinesisConfs.shardFetchInterval)
.option("maxFetchRate", kinesisConfs.maxFetchRate)
.load()
pom.xml中的依赖项-
<dependencies>
<!-- SCALA -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<!-- SPARK -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>0.7.0</version>
<scope>provided</scope>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<!--
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${jackson.version}</version>
</dependency>
-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.softwaremill.sttp</groupId>
<artifactId>core_${scala.binary.version}</artifactId>
<version>${sttp.version}</version>
</dependency>
<dependency>
<groupId>com.softwaremill.sttp</groupId>
<artifactId>json4s_${scala.binary.version}</artifactId>
<version>${sttp.version}</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>${json4s.version}</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ext_${scala.binary.version}</artifactId>
<version>${json4s.version}</version>
</dependency>
<!-- TEST -->
<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_${scala.binary.version}</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scoverage</groupId>
<artifactId>scalac-scoverage-plugin_${scala.binary.version}</artifactId>
<version>${scoverage.plugin.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.storm-enroute</groupId>
<artifactId>scalameter_${scala.binary.version}</artifactId>
<version>0.17</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mongodb</groupId>
<artifactId>casbah_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah_${scala.binary.version}</artifactId>
<version>3.1.1</version>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.awaitility/awaitility-scala -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility-scala</artifactId>
<version>3.1.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>....custom_dependency....</groupId>
<artifactId>....custom_dependency....</artifactId>
<version>2.0.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>....custom_dependency....</groupId>
<artifactId>....custom_dependency....</artifactId>
<version>1.4.0</version>
<scope>test</scope>
</dependency>
<!-- OTHERS -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.ocpsoft.prettytime</groupId>
<artifactId>prettytime</artifactId>
<version>4.0.1.Final</version>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.binary.version}</artifactId>
<version>3.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-kinesis -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.313</version>
<scope>provided</scope>
</dependency>
</dependencies>
错误-
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kinesis. Please find packages at http://spark.apache.org/third-party-projects.html
.
.
.
.
.
.
.
.
.
Caused by: java.lang.ClassNotFoundException: kinesis.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
... 40 more
使用运行jar-
spark-submit --packages io.delta:delta-core_2.12:0.7.0 /home/hadoop/<jar_name> <some_more_arguments>
以上代码以其当前形式成功地在databricks上运行(我猜这是因为databricks运行时提供了内在的支持)
我到现在为止所做的-
在pom.xml依赖项中,我添加了-
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
<version>2.4.4</version>
</dependency>
在电子病历上查过了-
spark-submit --packages io.delta:delta-core_2.12:0.7.0,org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 /home/hadoop/<Jar_name>.jar <Some_more_arguments>
但即使这样也会导致同样的错误。我不知道如何解决这个问题。
更新-11/feb/2021
也试过了 spark-streaming-kinesis-asl_2.12:3.0.1
,但同样的结果。
提前谢谢。
1条答案
按热度按时间cnh2zyt31#
在添加以下依赖项并将其提供给类路径时。我能够缓解这个问题。
相关性->
如果有更好的解决办法,一定要告诉我。