遍历sparkDataframe的列并更新指定的值

r1zhe5dt  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(388)

为了遍历从配置单元表创建的sparkDataframe的列并更新所有出现的所需列值,我尝试了以下代码。

import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf

val a: DataFrame = spark.sql(s"select * from default.table_a")

    val column_names: Array[String] = a.columns

    val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date")) 

    val func = udf((value: String) => { if if (value == "XXXX" || value == "WWWW" || value == "TTTT") "NULL" else value } )

    val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}

在sparkshell中执行代码时,我得到了以下错误。

scala> val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
<console>:35: error: value a is not a member of org.apache.spark.sql.DataFrame
       val b = {for (column: String <- required_column_list) { a.withColumn(column , isNull(a(column))) } a}
                                                                                                          ^

我还尝试了下面的语句,但没有得到所需的输出。

val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }

变量b被创建为一个单元而不是Dataframe。

scala> val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
    b: Unit = ()

请提出更好的方法来遍历dataframe的列并更新列中所有出现的值,或者纠正我的错误。任何其他解决方案也值得赞赏。提前谢谢。

kx5bkwkv

kx5bkwkv1#

不要使用for循环,而应该使用 foldLeft . 你不需要一个 udf 功能, when 可以使用内置函数

val column_names: Array[String] = a.columns

val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))

import org.apache.spark.sql.functions._
val b = required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))}

我希望答案是有帮助的

说明:

在里面
required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))} required_columns 是来自的列名数组 a Dataframe/数据集 _date 作为结束字符串 colName 内部
withColumn tempdf 是原始Dataframe/数据集,即。 a 当函数在内部应用时 withColumn 它取代了所有 XXX 或者 WWWWW 或者 TTTT 值到 NULL 最后 foldLeft 返回应用于dataframe的所有转换 b

相关问题