scala 出现“java.lang.ClassNotFoundException:org.apache.spark.sql.catalyst.FileSourceOptions$”,当spark提交到Amazon EMR时

xlpyo6sf  于 2023-06-06  发布在  Scala
关注(0)|答案(1)|浏览(167)

我有Spark应用程序。我的build.sbt看起来像

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"

val sparkVersion = "3.3.1"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.475" % "provided",

  "org.apache.spark" %% "spark-avro" % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
  "io.delta" %% "delta-core" % "2.4.0",
  "za.co.absa" %% "abris" % "6.3.0"
)

ThisBuild / assemblyMergeStrategy := {
  // https://stackoverflow.com/a/67937671/2000548
  case PathList("module-info.class") => MergeStrategy.discard
  case x if x.endsWith("/module-info.class") => MergeStrategy.discard
  // https://stackoverflow.com/a/76129963/2000548
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

我有一个Amazon EMR集群,里面有Spark 3.3.1。
当我spark-submit到亚马逊电子病历,我得到了错误

/bin/bash -c "/usr/bin/spark-submit --master yarn --deploy-mode client --class com.hongbomiao.IngestFromS3ToKafka --name ingest-from-s3-to-kafka /home/hadoop/IngestFromS3ToKafka-assembly-1.0.jar"
23/05/31 22:35:31 INFO SparkContext: Running Spark version 3.3.1-amzn-0
# ...
Exception in thread "main" com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/FileSourceOptions$
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2261)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
    at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:801)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:811)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:715)
    at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:663)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$forTableWithSnapshot$1(DeltaLog.scala:720)
    at org.apache.spark.sql.delta.DeltaLog$.withFreshSnapshot(DeltaLog.scala:753)
    at org.apache.spark.sql.delta.DeltaLog$.forTableWithSnapshot(DeltaLog.scala:720)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.sourceSchema(DeltaDataSource.scala:91)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
    at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:29)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/FileSourceOptions$
    at org.apache.spark.sql.delta.DeltaLog.indexToRelation(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.loadIndex(DeltaLog.scala:196)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$protocolAndMetadataReconstruction$1(Snapshot.scala:210)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.delta.Snapshot.protocolAndMetadataReconstruction(Snapshot.scala:210)
    at org.apache.spark.sql.delta.Snapshot.x$1$lzycompute(Snapshot.scala:134)
    at org.apache.spark.sql.delta.Snapshot.x$1(Snapshot.scala:129)
    at org.apache.spark.sql.delta.Snapshot._metadata$lzycompute(Snapshot.scala:129)
    at org.apache.spark.sql.delta.Snapshot._metadata(Snapshot.scala:129)
    at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:197)
    at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:390)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:136)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:393)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:370)
    at org.apache.spark.internal.Logging.logInfo(Logging.scala:61)
    at org.apache.spark.internal.Logging.logInfo$(Logging.scala:60)
    at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:370)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:393)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$4(SnapshotManagement.scala:356)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:480)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:468)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:74)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:349)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:343)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:74)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshotAtInitInternal$1(SnapshotManagement.scala:304)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal(SnapshotManagement.scala:301)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal$(SnapshotManagement.scala:298)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshotAtInitInternal(DeltaLog.scala:74)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:293)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
    at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:288)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:287)
    at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:74)
    at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:57)
    at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:80)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:790)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:785)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
    at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:595)
    at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
    at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:595)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
    at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:595)
    at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:784)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:802)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
    ... 30 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.FileSourceOptions$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 91 more
yqlxgs2m

yqlxgs2m1#

最初我以为这个问题与spark-sql库有关。
后来我发现真实的的问题是因为delta-core2.4.0与Spark 3.3.1不兼容
基于https://docs.delta.io/latest/releases.html#compatibility-with-apache-spark

一旦我从

"io.delta" %% "delta-core" % "2.4.0",

"io.delta" %% "delta-core" % "2.3.0",

重新编译,然后再次提交。成功了!

相关问题