如何在scala中将sparkDataframe转换为结构列表

edqdpe6u  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(460)

我有一个sparkDataframe,由12行和不同的列组成,在本例中是22行。
我想将其转换为以下格式的Dataframe:

root
 |-- data: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- ast: double (nullable = true)
 |    |    |-- blk: double (nullable = true)
 |    |    |-- dreb: double (nullable = true)
 |    |    |-- fg3_pct: double (nullable = true)
 |    |    |-- fg3a: double (nullable = true)
 |    |    |-- fg3m: double (nullable = true)
 |    |    |-- fg_pct: double (nullable = true)
 |    |    |-- fga: double (nullable = true)
 |    |    |-- fgm: double (nullable = true)
 |    |    |-- ft_pct: double (nullable = true)
 |    |    |-- fta: double (nullable = true)
 |    |    |-- ftm: double (nullable = true)
 |    |    |-- games_played: long (nullable = true)
 |    |    |-- seconds: double (nullable = true)
 |    |    |-- oreb: double (nullable = true)
 |    |    |-- pf: double (nullable = true)
 |    |    |-- player_id: long (nullable = true)
 |    |    |-- pts: double (nullable = true)
 |    |    |-- reb: double (nullable = true)
 |    |    |-- season: long (nullable = true)
 |    |    |-- stl: double (nullable = true)
 |    |    |-- turnover: double (nullable = true)

其中Dataframe的每个元素 data 字段对应于原始Dataframe的不同行。
最终目标是将其导出到.json文件,该文件的格式如下:

{"data": [{row1}, {row2}, ..., {row12}]}

我目前使用的代码如下:

val best_12_struct = best_12.withColumn("data", array((0 to 11).map(i => struct(col("ast"), col("blk"), col("dreb"), col("fg3_pct"), col("fg3a"), 
                                                                   col("fg3m"), col("fg_pct"), col("fga"), col("fgm"), 
                                                                   col("ft_pct"), col("fta"), col("ftm"), col("games_played"), 
                                                                   col("seconds"), col("oreb"), col("pf"), col("player_id"), 
                                                                   col("pts"), col("reb"), col("season"), col("stl"), col("turnover"))) : _*))

val best_12_data = best_12_struct.select("data")

但是 array(0 to 11) 将同一元素复制12次到 data . 因此 .json 我终于得到了12个 {"data": ...} ,在同一行中复制12次,而不是只复制一次 {"data": ...} 具有12个元素,每个元素对应于原始Dataframe的一行。

q35jwt9p

q35jwt9p1#

您有12倍于该方法的同一行 withColumn 将只从当前处理的行中选取信息。
您需要在Dataframe级别使用 collect_list 这是一个聚合函数,如下所示:

import org.apache.spark.sql.functions._

val best_12_data = best_12
  .withColumn("row", struct(col("ast"), col("blk"), col("dreb"), col("fg3_pct"), col("fg3a"), col("fg3m"), col("fg_pct"), col("fga"), col("fgm"), col("ft_pct"), col("fta"), col("ftm"), col("games_played"), col("seconds"), col("oreb"), col("pf"), col("player_id"), col("pts"), col("reb"), col("season"), col("stl"), col("turnover")))
  .agg(collect_list(col("row")).as("data"))

相关问题