如何检查sparkDataframe中是否存在列

drkbr07n  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(376)

如果Dataframe中不存在列,我尝试用逻辑返回空列。
模式更改非常频繁,有时会丢失整个结构( temp1 )或结构中的数组将丢失( suffix )
架构如下所示:

root
     |-- id: string (nullable = true)
     |-- temp: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- temp1: struct (nullable = true)
     |    |    |    |-- code: string (nullable = true)
     |    |    |    |-- code1: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)
     |    |    |-- temp2: struct (nullable = true)
     |    |    |    |-- name1: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)
     |    |    |    |-- suffix: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)
     |-- timestamp: timestamp (nullable = true)

或者像这样:

root
      |-- id: string (nullable = true)
      |-- temp: array (nullable = true)
      |    |-- element: struct (containsNull = true)
      |    |    |-- temp2: struct (nullable = true)
      |    |    |    |-- name1: array (nullable = true)
      |    |    |    |    |-- element: string (containsNull = true)
      |-- timestamp: timestamp (nullable = true)

当我为第二个模式尝试下面的逻辑时,得到一个struct找不到的异常

def has_Column(df: DataFrame, path: String) = Try(df(path)).isSuccess

df.withColumn("id", col("id")).
  withColumn("tempLn", explode(col("temp"))).
  withColumn("temp1_code1", when(lit(has_Column(df, "tempLn.temp1.code1")), concat_ws(" ",col("tempLn.temp1.code1"))).otherwise(lit("").cast("string"))).
  withColumn("temp2_suffix", when(lit(has_Column(df, "tempLn.temp2.suffix")), concat_ws(" ",col("tempLn.temp2.suffix"))).otherwise(lit("").cast("string")))

错误:
org.apache.spark.sql.analysisexception:没有这样的结构字段temp1;

kfgdxczn

kfgdxczn1#

您需要检查select/withcolumn之外是否存在。。。方法。就像你在书中提到的那样 then 作为case表达式的一部分,spark在分析查询时尝试解析它。
所以你需要这样测试:

if (has_Column(df, "tempLn.temp1.code1"))
   df.withColumn("temp2_suffix", concat_ws(" ",col("tempLn.temp2.suffix")))
else
   df.withColumn("temp2_suffix", lit(""))

要对多个列执行此操作,可以使用foldleft,如下所示:

val df1 = Seq(
  ("tempLn.temp1.code1", "temp1_code1"),
  ("tempLn.temp2.suffix", "temp2_suffix")
).foldLeft(df) {
  case (acc, (field, newCol)) => {
    if (has_Column(acc, field))
      acc.withColumn(newCol, concat_ws(" ", col(field)))
    else
      acc.withColumn(newCol, lit(""))
  }
}

相关问题