如何过滤具有struct数组的行?

wgmfuz8q  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(544)

我有一个带有struct数组的dataframe,所以我只想过滤列,或者我们可以说从struct数组中选择array of struct中的column,但这是可能的,因为我正在遍历行。架构

root 
     |-- day: long (nullable = true)
     |-- table_row: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |-- DATE: string (nullable = true)
     |    |-- ADMISSION_NUM: string (nullable = true)
     |    |-- SOURCE_CODE: string (nullable = true)

我所做的是遍历行,我们可以按行选择数组列。我只想知道这怎么可能

def keepColumnInarray(columns: Set[String], row: Row): Row = {
      //Some 
    }

例如,如果我想保留列“data”,那么keepcolumninarray将只选择此列
输出架构

root 
     |-- day: long (nullable = true)
     |-- table_row: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |-- DATE: string (nullable = true)
vulvrdjw

vulvrdjw1#

Spark>=2.4

df.withColumn("table_row", expr("TRANSFORM (table_row, x -> named_struct('DATE', x.DATE))")

这将转换

table_row: array
struct (nullable = true)
     |    |-- DATE: string (nullable = true)
     |    |-- ADMISSION_NUM: string (nullable = true)
     |    |-- SOURCE_CODE: string (nullable = true)

table_row: array
 struct (nullable = true)
     |    |-- DATE: string (nullable = true)

更新-1(基于评论)

Spark<2.4

使用下面的自定义项选择列-

val df = spark.range(2).withColumnRenamed("id", "day")
      .withColumn("table_row", expr("array(named_struct('DATE', 'sample_date'," +
        " 'ADMISSION_NUM', 'sample_adm_num', 'SOURCE_CODE', 'sample_source_code'))"))
    df.show(false)
    df.printSchema()

//   
//   +---+---------------------------------------------------+
//   |day|table_row                                          |
//   +---+---------------------------------------------------+
//   |0  |[[sample_date, sample_adm_num, sample_source_code]]|
//   |1  |[[sample_date, sample_adm_num, sample_source_code]]|
//   +---+---------------------------------------------------+
//   
//   root
//   |-- day: long (nullable = false)
//   |-- table_row: array (nullable = false)
//   |    |-- element: struct (containsNull = false)
//   |    |    |-- DATE: string (nullable = false)
//   |    |    |-- ADMISSION_NUM: string (nullable = false)
//   |    |    |-- SOURCE_CODE: string (nullable = false)
//
def keepColumnInarray(columnsToKeep: Seq[String], rows: mutable.WrappedArray[Row]) = {
      rows.map(r => {
        new GenericRowWithSchema(r.getValuesMap(columnsToKeep).values.toArray,
          StructType(r.schema.filter(s => columnsToKeep.contains(s.name))))
      })
    }

    val keepColumns = udf((columnsToKeep: Seq[String], rows: mutable.WrappedArray[Row]) =>
      keepColumnInarray(columnsToKeep, rows)
      , ArrayType(StructType(StructField("DATE", StringType) :: Nil)))

    val processedDF = df
      .withColumn("table_row_new", keepColumns(array(lit("DATE")), col("table_row")))
    processedDF.show(false)
    processedDF.printSchema()

//    
//    +---+---------------------------------------------------+---------------+
//    |day|table_row                                          |table_row_new  |
//    +---+---------------------------------------------------+---------------+
//    |0  |[[sample_date, sample_adm_num, sample_source_code]]|[[sample_date]]|
//    |1  |[[sample_date, sample_adm_num, sample_source_code]]|[[sample_date]]|
//    +---+---------------------------------------------------+---------------+
//    
//    root
//    |-- day: long (nullable = false)
//    |-- table_row: array (nullable = false)
//    |    |-- element: struct (containsNull = false)
//    |    |    |-- DATE: string (nullable = false)
//    |    |    |-- ADMISSION_NUM: string (nullable = false)
//    |    |    |-- SOURCE_CODE: string (nullable = false)
//    |-- table_row_new: array (nullable = true)
//    |    |-- element: struct (containsNull = true)
//    |    |    |-- DATE: string (nullable = true)
//

相关问题