我有Spark应用程序。我的build.sbt看起来像
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
val sparkVersion = "3.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.475" % "provided",
"org.apache.spark" %% "spark-avro" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"io.delta" %% "delta-core" % "2.4.0",
"za.co.absa" %% "abris" % "6.3.0"
)
ThisBuild / assemblyMergeStrategy := {
// https://stackoverflow.com/a/67937671/2000548
case PathList("module-info.class") => MergeStrategy.discard
case x if x.endsWith("/module-info.class") => MergeStrategy.discard
// https://stackoverflow.com/a/76129963/2000548
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}
我有一个Amazon EMR集群,里面有Spark 3.3.1。
当我spark-submit
到亚马逊电子病历,我得到了错误
/bin/bash -c "/usr/bin/spark-submit --master yarn --deploy-mode client --class com.hongbomiao.IngestFromS3ToKafka --name ingest-from-s3-to-kafka /home/hadoop/IngestFromS3ToKafka-assembly-1.0.jar"
23/05/31 22:35:31 INFO SparkContext: Running Spark version 3.3.1-amzn-0
# ...
Exception in thread "main" com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/FileSourceOptions$
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2261)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:801)
at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:811)
at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:715)
at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:663)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$forTableWithSnapshot$1(DeltaLog.scala:720)
at org.apache.spark.sql.delta.DeltaLog$.withFreshSnapshot(DeltaLog.scala:753)
at org.apache.spark.sql.delta.DeltaLog$.forTableWithSnapshot(DeltaLog.scala:720)
at org.apache.spark.sql.delta.sources.DeltaDataSource.sourceSchema(DeltaDataSource.scala:91)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210)
at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:29)
at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/FileSourceOptions$
at org.apache.spark.sql.delta.DeltaLog.indexToRelation(DeltaLog.scala:181)
at org.apache.spark.sql.delta.DeltaLog.loadIndex(DeltaLog.scala:196)
at org.apache.spark.sql.delta.Snapshot.$anonfun$protocolAndMetadataReconstruction$1(Snapshot.scala:210)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.delta.Snapshot.protocolAndMetadataReconstruction(Snapshot.scala:210)
at org.apache.spark.sql.delta.Snapshot.x$1$lzycompute(Snapshot.scala:134)
at org.apache.spark.sql.delta.Snapshot.x$1(Snapshot.scala:129)
at org.apache.spark.sql.delta.Snapshot._metadata$lzycompute(Snapshot.scala:129)
at org.apache.spark.sql.delta.Snapshot._metadata(Snapshot.scala:129)
at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:197)
at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:390)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:393)
at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:370)
at org.apache.spark.internal.Logging.logInfo(Logging.scala:61)
at org.apache.spark.internal.Logging.logInfo$(Logging.scala:60)
at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:370)
at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:393)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$4(SnapshotManagement.scala:356)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:480)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:468)
at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:349)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:343)
at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshotAtInitInternal$1(SnapshotManagement.scala:304)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal(SnapshotManagement.scala:301)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal$(SnapshotManagement.scala:298)
at org.apache.spark.sql.delta.DeltaLog.createSnapshotAtInitInternal(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:293)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:288)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:287)
at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:74)
at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:57)
at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:80)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:790)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:785)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:595)
at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:595)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:595)
at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:784)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:802)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 30 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.FileSourceOptions$
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 91 more
1条答案
按热度按时间yqlxgs2m1#
最初我以为这个问题与spark-sql库有关。
后来我发现真实的的问题是因为delta-core2.4.0与Spark 3.3.1不兼容
基于https://docs.delta.io/latest/releases.html#compatibility-with-apache-spark
一旦我从
到
重新编译,然后再次提交。成功了!