使用spark/livy的todf时发生nosuchelementexception

qlzsbp2j  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(541)

我试图从spark中生成一个sparkDataframe,它已经使用apachelivy进行了初始化。
我第一次在这个更复杂的hbase调用中注意到这个问题:

  1. import spark.implicits._
  2. ...
  3. spark.sparkContext
  4. .newAPIHadoopRDD(
  5. conf,
  6. classOf[TableInputFormat],
  7. classOf[ImmutableBytesWritable],
  8. classOf[Result]
  9. )
  10. .toDF()

但我发现我可以在一个简单的

  1. import spark.implicits._
  2. ...
  3. val filtersDf = filters.toDF()

哪里, filtersDf 只是一系列案例类。
共同的问题是 *.toDF() 但是,它也发生在 *.toDS() ,这让我觉得 import spark.implicits._ 不起作用。要转换为dataframes的底层对象确实有数据。
错误堆栈似乎与使用scala运行时反射的运行时隐式解析有关。
注意,我已经检查过了,spark和编译的代码都使用相同版本的scala(2.11)。
我得到的例外是:

  1. java.lang.RuntimeException: java.util.NoSuchElementException: head of empty list
  2. scala.collection.immutable.Nil$.head(List.scala:420)
  3. scala.collection.immutable.Nil$.head(List.scala:417)
  4. scala.collection.immutable.List.map(List.scala:277)
  5. scala.reflect.internal.Symbols$Symbol.parentSymbols(Symbols.scala:2117)
  6. scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:301)
  7. scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:341)
  8. scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply$mcV$sp(SymbolLoaders.scala:74)
  9. scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
  10. scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
  11. scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:263)
  12. scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:71)
  13. scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
  14. scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:174)
  15. scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
  16. scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
  17. scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  18. scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  19. scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
  20. scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:174)
  21. scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
  22. scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.info(SynchronizedSymbols.scala:174)
  23. scala.reflect.internal.Types$TypeRef.thisInfo(Types.scala:2194)
  24. scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2199)
  25. scala.reflect.internal.tpe.FindMembers$FindMemberBase.<init>(FindMembers.scala:17)
  26. scala.reflect.internal.tpe.FindMembers$FindMember.<init>(FindMembers.scala:219)
  27. scala.reflect.internal.Types$Type.scala$reflect$internal$Types$Type$$findMemberInternal$1(Types.scala:1014)
  28. scala.reflect.internal.Types$Type.findMember(Types.scala:1016)
  29. scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:631)
  30. scala.reflect.internal.Types$Type.member(Types.scala:600)
  31. scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
  32. scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
  33. scala.reflect.internal.Mirrors$RootsBase.staticPackage(Mirrors.scala:204)
  34. scala.reflect.runtime.JavaMirrors$JavaMirror.staticPackage(JavaMirrors.scala:82)
  35. scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:263)
  36. scala.reflect.runtime.JavaMirrors$class.scala$reflect$runtime$JavaMirrors$$createMirror(JavaMirrors.scala:32)
  37. scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:49)
  38. scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:47)
  39. scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  40. scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  41. scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:46)
  42. scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
  43. scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)

我的工作假设是我缺少一个依赖项或导入,这是一种scala主义。
我还没有找到任何其他提到这个问题的地方。最终我认为这可能是由于导入/依赖,但到目前为止,我还不太清楚它是什么。非常感谢您的帮助。我很想知道解决这个问题的方法,或者通过比其他方法更不神奇的方法来创建Dataframe toDf() .
spark信息:

  1. Welcome to
  2. ____ __
  3. / __/__ ___ _____/ /__
  4. _\ \/ _ \/ _ `/ __/ '_/
  5. /___/ .__/\_,_/_/ /_/\_\ version 2.3.2.0-mapr-1901
  6. /_/
  7. Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_191)
ktca8awb

ktca8awb1#

我在同一版本的spark上遇到了这个错误,但是,我在从hdfs读取csv时看到错误出现,这就是我正在做的一个示例:

  1. val csv: DataFrame = ss
  2. .read
  3. .option("header", "true")
  4. .option("mode", "DROPMALFORMED")
  5. .csv(filePath)
  6. println(csv.count())

这就是我看到错误起源于Spark的地方。
我举了一个失败的小例子,试图找出问题的根源。我使用livyscala编程api将作业提交给spark。我发现它之所以失败是因为我通过livy将类型作为参数传递给spark,这是有意义的,因为这是一个scala反射错误。
例如,此操作失败:

  1. case class FailingJob(someSeq: Seq[String], filePath: String) {
  2. ...
  3. def call(scalaJobContext: ScalaJobContext): Unit = {
  4. // It doesn't really matter what I do here.
  5. // Main this is that the seq is used in some way.
  6. val mappedSeq = someSeq.map(s => s.toUpperCase())
  7. val ss: SparkSession = scalaJobContext.sparkSession
  8. val csv: DataFrame = ss
  9. .read
  10. .option("header", "true")
  11. .option("mode", "DROPMALFORMED")
  12. .csv(filePath)
  13. println(csv.count())
  14. ...
  15. for {
  16. _ <- livyClient.submit(FailingJob(someSeq, path).call)
  17. ...

鉴于这是成功的:

  1. case class SuccessfulJob(someArray: Array[String], filePath: String) {
  2. ...
  3. def call(scalaJobContext: ScalaJobContext): Unit = {
  4. // It doesn't really matter what I do here.
  5. // Main this is that the seq is used in some way.
  6. val mappedSeq = someArray.map(s => s.toUpperCase())
  7. val ss: SparkSession = scalaJobContext.sparkSession
  8. val csv: DataFrame = ss
  9. .read
  10. .option("header", "true")
  11. .option("mode", "DROPMALFORMED")
  12. .csv(filePath)
  13. println(csv.count())
  14. ...
  15. for {
  16. _ <- livyClient.submit(SuccessfulJob(someArray, path).call)
  17. ...

如果我传入一个参数 Seq 这失败了,这让我认为这是serrialization/deserialization wihtin的问题 kryo . 另外要注意的是,如果我将值作为对象的属性引用,则不会引发此错误。我尝试升级到spark 2.4,但运气不好。我用的是livy版本 0.6.0-incubating . 我目前的工作是转换要使用的对象 Array 类型而不是 Seq . 我怀疑其他特定于scala的类型也会失败,尽管我还没有尝试过。
这是我对这个问题的复制品,包括解决方法。我很感激这不能回答这个问题,但希望它能帮助其他人在其他场景中调试这个问题。我还向livy提交了一个问题,看看他们是否能对正在发生的事情有更多的了解。

展开查看全部

相关问题