我正在解析来自给定 RDD[String]
试着把它变成 Dataset
用给定的 case class
. 但是,当json字符串不包含 case class
我得到一个例外,那就是找不到缺少的列。
如何为这种情况定义默认值?
我尝试在 case class
但这并没有解决问题。我正在使用spark 2.3.2和scala 2.11.12。
这个代码运行良好
import org.apache.spark.rdd.RDD
case class SchemaClass(a: String, b: String)
val jsonData: String = """{"a": "foo", "b": "bar"}"""
val jsonRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonData))
import spark.implicits._
val ds = spark.read.json(jsonRddString).as[SchemaClass]
当我运行这个代码时
val jsonDataIncomplete: String = """{"a": "foo"}"""
val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete))
import spark.implicits._
val dsIncomplete = spark.read.json(jsonIncompleteRddString).as[SchemaClass]
dsIncomplete.printSchema()
dsIncomplete.show()
我得到以下例外
org.apache.spark.sql.AnalysisException: cannot resolve '`b`' given input columns: [a];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
[...]
有趣的是,当从文件解析json字符串时,会应用默认值“null”,如spark数据集文档中给出的示例所示:
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
json文件的内容
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
4条答案
按热度按时间woobm2wo1#
如果在同一个rdd中有不同的json字符串,那么它的工作方式是相同的。当只有一个与模式不匹配时,它将抛出错误。
如。
一种方法是将其转换为[person],您可以从中构建模式(structtype),并在读取json文件时应用它,
代码文件的内容是,
brgchamk2#
来自@sathiyan s的答案让我找到了以下解决方案(在这里介绍它,因为它并没有完全解决我的问题,而是指向了正确的方向):
好处:
所有缺少的字段都设置为
null
,独立于数据类型json字符串中的其他字段,这些字段不是
case class
将被忽略c86crjj03#
qjp7pelc4#
现在可以跳过将json作为rdd加载,然后作为df直接读取
val dsIncomplete = spark.read.json(Seq(jsonDataIncomplete).toDS)
如果您使用的是spark 2.2+加载json数据
从case类中提取模式或手动定义它
获取缺少的字段列表
默认值为
lit(null).cast(col.dataType)
缺少列。