scala—如何将从rdd[string]解析的数据集中的默认值设置为“null”,并将case类作为模式应用

ezykj2lf  于 2021-05-29  发布在  Spark
关注(0)|答案(4)|浏览(345)

我正在解析来自给定 RDD[String] 试着把它变成 Dataset 用给定的 case class . 但是,当json字符串不包含 case class 我得到一个例外,那就是找不到缺少的列。
如何为这种情况定义默认值?
我尝试在 case class 但这并没有解决问题。我正在使用spark 2.3.2和scala 2.11.12。
这个代码运行良好

import org.apache.spark.rdd.RDD

case class SchemaClass(a: String, b: String)

val jsonData: String = """{"a": "foo", "b": "bar"}"""
val jsonRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonData))

import spark.implicits._
val ds = spark.read.json(jsonRddString).as[SchemaClass]

当我运行这个代码时

val jsonDataIncomplete: String = """{"a": "foo"}"""
val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete))

import spark.implicits._
val dsIncomplete = spark.read.json(jsonIncompleteRddString).as[SchemaClass]

dsIncomplete.printSchema()
dsIncomplete.show()

我得到以下例外

org.apache.spark.sql.AnalysisException: cannot resolve '`b`' given input columns: [a];
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
[...]

有趣的是,当从文件解析json字符串时,会应用默认值“null”,如spark数据集文档中给出的示例所示:

val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

json文件的内容

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
woobm2wo

woobm2wo1#

如果在同一个rdd中有不同的json字符串,那么它的工作方式是相同的。当只有一个与模式不匹配时,它将抛出错误。
如。

val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete, jsonData))

import spark.implicits._
val dsIncomplete = spark.read.json(jsonIncompleteRddString).as[SchemaClass]

dsIncomplete.printSchema()
dsIncomplete.show()

scala> dsIncomplete.show()
+---+----+
|  a|   b|
+---+----+
|foo|null|
|foo| bar|
+---+----+

一种方法是将其转换为[person],您可以从中构建模式(structtype),并在读取json文件时应用它,

import org.apache.spark.sql.Encoders

val schema = Encoders.product[Person].schema

val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.schema(schema).json(path).as[Person]
peopleDS.show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
+-------+----+

代码文件的内容是,

{"name":"Michael"}
brgchamk

brgchamk2#

来自@sathiyan s的答案让我找到了以下解决方案(在这里介绍它,因为它并没有完全解决我的问题,而是指向了正确的方向):

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.{StructField, StructType}

// created expected schema
val schema = Encoders.product[SchemaClass].schema

// convert all fields as nullable
val newSchema = StructType(schema.map {
  case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = true, m)
})

// apply expected and nullable schema for parsing json string            
session.read.schema(newSchema).json(jsonIncompleteRddString).as[SchemaClass]

好处:
所有缺少的字段都设置为 null ,独立于数据类型
json字符串中的其他字段,这些字段不是 case class 将被忽略

c86crjj0

c86crjj03#

package spark

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, Encoders, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.{col, lit}

object JsonDF extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class SchemaClass(a: String, b: Int)

  val jsonDataIncomplete: String = """{"a": "foo", "m": "eee"}"""
  val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete))

  val dsIncomplete = spark.read.json(jsonIncompleteRddString)  // .as[SchemaClass]

  lazy val schema: StructType    = Encoders.product[SchemaClass].schema
  lazy val fields: Array[String] = schema.fieldNames
  lazy val colNames: Array[Column]  = fields.map(col(_))

  val sch = dsIncomplete.schema
  val schemaDiff = schema.diff(sch)
  val rr = schemaDiff.foldLeft(dsIncomplete)((acc, col) => {
    acc.withColumn(col.name, lit(null).cast(col.dataType))
  })

  val schF = dsIncomplete.schema
  val schDiff = schF.diff(schema)

  val rrr = schDiff.foldLeft(rr)((acc, col) => {
    acc.drop(col.name)
  })
    .select(colNames: _*)

}
qjp7pelc

qjp7pelc4#

现在可以跳过将json作为rdd加载,然后作为df直接读取 val dsIncomplete = spark.read.json(Seq(jsonDataIncomplete).toDS) 如果您使用的是spark 2.2+
加载json数据
从case类中提取模式或手动定义它
获取缺少的字段列表
默认值为 lit(null).cast(col.dataType) 缺少列。

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}

object DefaultFieldValue {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._
    val jsonDataIncomplete: String = """{"a": "foo"}"""
    val dsIncomplete = spark.read.json(Seq(jsonDataIncomplete).toDS)
    val schema: StructType = Encoders.product[SchemaClass].schema

    val fields: Array[StructField] = schema.fields

    val outdf = fields.diff(dsIncomplete.columns).foldLeft(dsIncomplete)((acc, col) => {
      acc.withColumn(col.name, lit(null).cast(col.dataType))
    })

    outdf.printSchema()
    outdf.show()

  }
}

case class SchemaClass(a: String, b: Int, c: String, d: Double)

相关问题