nosuchmethoderror

ddarikpa  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(303)

尝试向集群提交spark作业,该集群从azure event hub读取二进制编码的流Dataframe,然后使用protobuf文件格式对其进行转换。
构建.sbt

import sbtassembly.AssemblyPlugin.autoImport.ShadeRule

name := "AdQualitySpark"
version := "0.1"
scalaVersion := "2.12.12"

unmanagedJars in Compile += file("lib/geneva-java-0.1.0.jar")
unmanagedJars in Compile += file("lib/bond-7.0.0-preview-2017-11-22.jar")
unmanagedJars in Compile += file("lib/azure-cosmosdb-spark_2.4.0_2.11-3.6.7-uber.jar")
resolvers += "MMLSpark Repo" at "https://mmlspark.azureedge.net/maven"

libraryDependencies ++= Seq(
  "org.apache.commons" % "commons-lang3" % "3.1",
  "org.apache.spark" %% "spark-core" % "3.0.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "3.0.0" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.0",
  "org.apache.kafka" % "kafka-clients" % "0.8.2.1" % "provided",
  "org.apache.spark" %% "spark-mllib" % "3.0.0" % "provided",
//  "com.databricks" %% "spark-csv" % "1.5.0",
  "org.rogach" %% "scallop" % "3.1.5",
  "org.scalaj" %% "scalaj-http" % "2.4.1",
  "com.microsoft.azure" %% "azure-eventhubs-spark" % "2.3.15",
  "org.scalatest" %% "scalatest" % "3.0.0" % "provided",
  "com.microsoft.ml.spark" %% "mmlspark" % "1.0.0-rc3-27-b1c14008-SNAPSHOT",
  "com.holdenkarau" %% "spark-testing-base" % "3.0.0_1.0.0" % "provided",
  "com.typesafe" % "config" % "1.4.0",
  "com.thesamet.scalapb" %% "compilerplugin" % "0.9.4",
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion, //% "protobuf",
  "com.google.protobuf" % "protobuf-java-util" % "3.11.1",
  "com.thesamet.scalapb" %% "sparksql-scalapb" % "0.9.0",
  "com.thesamet.scalapb" %% "scalapb-json4s" % "0.9.3",
  "com.microsoft.azure" % "azure-data-lake-store-sdk" % "2.3.8",
  "com.databricks" %% "dbutils-api" % "0.0.4",
  "io.delta" %% "delta-core" % "0.7.0",
  "com.microsoft.sqlserver" % "mssql-jdbc" % "8.2.1.jre8",
  "org.apache.spark" %% "spark-avro" % "3.0.0"
  // "com.databricks" %% "spark-avro" % "3.2.0"
)

// Needed for CosmosDB Spark connector.
dependencyOverrides ++= {
  Seq(
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0",
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0",
    "com.fasterxml.jackson.core" % "jackson-core" % "2.11.0",
    "com.google.guava" % "guava" % "15.0",
    "org.json4s" %% "json4s-jackson" % "3.6.10"
  )
}

/*
lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % "0.8.2.1" excludeAll(excludeJpountz) // add more exclusions here*/

assemblyShadeRules in assembly := Seq(
    ShadeRule.rename("com.google.protobuf.**" -> "shadeproto.@1").inAll
)

scalacOptions += "-Xmacro-settings:materialize-derivations"
javaOptions in assembly += "-Xmx2g"

assemblyExcludedJars in assembly := {
  val cp = (fullClasspath in assembly).value
  cp filter { f =>
    !(f.data.getName.contains("mml") || f.data.getName.contains("http") || f.data.getName.contains("proton")
      || f.data.getName.contains("spray") || f.data.getName.contains("scallop") || f.data.getName.contains("compat")
      || f.data.getName.contains("eventhub") || f.data.getName.contains("kafka") || f.data.getName.contains("scalapb")
      || f.data.getName.contains("compilerplugin") || f.data.getName.contains("lenses") || f.data.getName.contains("protoc")
      || f.data.getName.contains("frameless") || f.data.getName.contains("shadeproto")
      || f.data.getName.contains("geneva") || f.data.getName.contains("mssql-jdbc")|| f.data.getName().contains("spark-avro")
      || f.data.getName.contains("bond")|| f.data.getName.contains("cosmosdb") || f.data.getName.contains("delta"))
  }
  /*
    cp foreach  {f => println(f.data.getName,f.data.getAbsoluteFile().length())}
    cp*/
}

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)
/*
(scalastyleConfig in Test) := baseDirectory.value / "scalastyleconfig.xml"
logBuffered in Test := false

lazy val testScalastyle = taskKey[Unit]("testScalastyle")
testScalastyle := scalastyle.in(Test).toTask("").value
(test in Test) := ((test in Test) dependsOn testScalastyle).value */

lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
compileScalastyle := scalastyle.in(Compile).toTask("").value
(compile in Compile) := ((compile in Compile) dependsOn compileScalastyle).value

parallelExecution in Test := false

 assemblyMergeStrategy in assembly := {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
 }
 assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
    //todo:start including tests in build
 logBuffered in Test := false

引发异常的源代码是:

def editorialAdStream(sc: SparkContext, spark: SparkSession): Dataset[AdEntity] = {

val ehConnectionString = ConnectionStringBuilder(ConnectionString)
  .setEventHubName(EventHubName)
  .build

val customEventhubParameters =
  EventHubsConf(ehConnectionString)
    .setConsumerGroup(ConsumerGroup)
    .setEndingPosition(EventPosition.fromEndOfStream)
    .setMaxEventsPerTrigger(MaxEventsPerTrigger)
    .setPrefetchCount(PrefetchCount)

val binaryAdStream = spark
  .readStream
  .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")
  .options(customEventhubParameters.toMap)
  .load()
  .selectExpr("body")
  .as(Encoders.BINARY)

val adStream: Dataset\[EditorialAdEntity\] =
  binaryAdStream
    .map(m => EditorialAdEntity().mergeFrom(CodedInputStream.newInstance(m)))

异常堆栈跟踪是:

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.<init>(Lorg/apache/spark/sql/types/StructType;ZLscala/collection/Seq;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/reflect/ClassTag;)V
    at frameless.TypedExpressionEncoder$.apply(TypedExpressionEncoder.scala:45)
    at scalapb.spark.Implicits.typedEncoderToEncoder(TypedEncoders.scala:125)
    at scalapb.spark.Implicits.typedEncoderToEncoder$(TypedEncoders.scala:122)
    at scalapb.spark.Implicits$.typedEncoderToEncoder(TypedEncoders.scala:128)
    at Utils.MessagingQueues.EventHubSourceReader$.editorialAdStream(EventHubSourceReader.scala:57)
    at Utils.MessagingQueues.SourceReader$.readEditorialAdSource(SourceReader.scala:39)
    at Workflows.Streaming.PROD.UnifiedNRT.UnifiedNRTHelper$.fetchInputStream(UnifiedNRTHelper.scala:62)
    at Workflows.Streaming.PROD.UnifiedNRT.UnifiedNRT$.main(UnifiedNRT.scala:50)

事实:[1]项目使用的是spark 3.0[2] scala版本是2.12.12[3]根据官方文档,我尝试将scalapb libs的版本更改为不同的版本,但没有帮助:https://scalapb.github.io/docs/sparksql/#setting-启动您的项目
请帮助解决此异常。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题