spark数据集:示例:无法生成编码器问题

xeufq47z  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(494)

刚接触spark world并尝试了一个用scala编写的数据集示例,我在网上找到了这个示例
在通过sbt运行它时,我不断得到以下错误 org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class 知道我忽略了什么吗
也可以随意指出编写相同数据集示例的更好方法
谢谢

  1. > sbt> runMain DatasetExample
  2. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  3. 16/10/25 01:06:39 INFO Remoting: Starting remoting
  4. 16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.150.130:50555]
  5. [error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
  6. org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
  7. at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:306)
  8. at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:302)
  9. at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
  10. at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
  11. at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
  12. at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
  13. at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
  14. at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:302)
  15. at org.apache.spark.sql.Dataset.<init>(Dataset.scala:79)
  16. at org.apache.spark.sql.Dataset.<init>(Dataset.scala:90)
  17. at org.apache.spark.sql.DataFrame.as(DataFrame.scala:209)
  18. at DatasetExample$.main(DatasetExample.scala:45)
  19. at DatasetExample.main(DatasetExample.scala)
  20. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  21. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  22. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  23. at java.lang.reflect.Method.invoke(Method.java:497)
  24. [trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
  25. java.lang.RuntimeException: Nonzero exit code: 1
  26. at scala.sys.package$.error(package.scala:27)
  27. [trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
  28. [error] (sparkExamples/compile:runMain) Nonzero exit code: 1
  29. [error] Total time: 127 s, completed Oct 25, 2016 1:08:09 AM

代码:

  1. import org.apache.spark._
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.sql.SQLContext
  6. import org.apache.spark.sql._
  7. import org.apache.log4j.{Level, Logger}
  8. import org.apache.spark.sql.SQLContext
  9. import org.apache.spark.{SparkConf, SparkContext}
  10. import org.apache.spark.sql.functions._
  11. object DatasetExample {
  12. // Create data sets
  13. case class Student(name: String, dept: String, age:Long )
  14. case class Department(abbrevName: String, fullName: String)
  15. org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Not sure what exactly is the purpose
  16. def main(args: Array[String]) {
  17. Logger.getLogger("org").setLevel(Level.OFF)
  18. Logger.getLogger("akka").setLevel(Level.OFF)
  19. // initialise spark context
  20. val conf = new SparkConf().setAppName("SetsExamples").setMaster("local")
  21. val sc = new SparkContext(conf)
  22. val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
  23. import sqlcontext.implicits._ // Not sure what exactly is the purpose
  24. // Read JSON objects into a Dataset[Student].
  25. val students = sqlcontext.read.json("student.json").as[Student]
  26. students.show()
  27. // Select two columns and filter on one column.
  28. // Each argument of "select" must be a "TypedColumn".
  29. students.select($"name".as[String], $"dept".as[String]).
  30. filter(_._2 == "Math"). // Filter on _2, the second selected column
  31. collect()
  32. // Group by department and count each group.
  33. students.groupBy(_.dept).count().collect()
  34. // Group and aggregate in each group.
  35. students.groupBy(_.dept).
  36. agg(avg($"age").as[Double]).
  37. collect()
  38. // Initialize a Seq and convert to a Dataset.
  39. val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()
  40. // Show the contents of the Dataset.
  41. depts.show()
  42. // Join two datasets with "joinWith".
  43. val joined = students.joinWith(depts, $"dept" === $"abbrevName")
  44. // Show the contents of the joined Dataset.
  45. // Note that the original objects are nested into tuples under the _1 and _2 columns.
  46. joined.show()
  47. // terminate spark context
  48. sc.stop()
  49. }
  50. }

json文件(student.json):

  1. {"id" : "1201", "name" : "Kris", "age" : "25"}
  2. {"id" : "1202", "name" : "John", "age" : "28"}
  3. {"id" : "1203", "name" : "Chet", "age" : "39"}
  4. {"id" : "1204", "name" : "Mark", "age" : "23"}
  5. {"id" : "1205", "name" : "Vic", "age" : "23"}
6pp0gazn

6pp0gazn1#

这条线是导致问题的原因:

  1. org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

这意味着您正在向该上下文添加一个新的外部作用域,可以在示例化 inner class 在反序列化期间。
内部类是在spark repl中定义一个case类时创建的,注册这个类在其中定义的外部作用域允许我们在spark执行器上创建新示例。
在正常使用中(您的情况下),不需要调用此函数。
编辑:您还需要将案例类移到 DatasetExample 对象。
注: import sqlContext.implicits._ 是对隐式方法的scala特定调用,可用于将普通scala rdd对象转换为Dataframe。
更多关于这个。

相关问题