scala—以编程方式从struct column中提取列作为单独的列

polhcujo  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(248)

我有一个Dataframe如下

val initialData = Seq(
    Row("ABC1",List(Row("Java","XX",120),Row("Scala","XA",300))),
    Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
    Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250)))
)

val arrayStructSchema = new StructType().add("name",StringType)
.add("SortedDataSet",ArrayType(new StructType()
.add("name",StringType)
.add("author",StringType)
.add("pages",IntegerType)))

val df = spark
.createDataFrame(spark.sparkContext.parallelize(initialData),arrayStructSchema)

df.printSchema()
df.show(5, false)

+-------+-----------------------------------+
|name   |SortedDataSet                      |
+-------+-----------------------------------+
|ABC1   |[[Java, XX, 120], [Scala, XA, 300]]|
|Michael|[[Java, XY, 200], [Scala, XB, 500]]|
|Robert |[[Java, XZ, 400], [Scala, XC, 250]]|
+-------+-----------------------------------+

我现在需要将结构的每个元素提取为一个单独的索引列,我将执行以下操作

val newDf = df.withColumn("Col1", sort_array('SortedDataSet).getItem(0))
.withColumn("Col2", sort_array('SortedDataSet).getItem(1))
.withColumn("name_1",$"Col1.name")
.withColumn("author_1",$"Col1.author")
.withColumn("pages_1",$"Col1.pages")
.withColumn("name_2",$"Col2.name")
.withColumn("author_2",$"Col2.author")
.withColumn("pages_2",$"Col2.pages")

这很简单,因为我只有2个数组和5列。当我有多个数组和列时该怎么办?
如何以编程方式执行此操作?

0tdrvxhp

0tdrvxhp1#

如果数组的大小相同,则可以通过动态选择数组和结构元素来避免执行昂贵的分解、分组和透视:

val arrSize = df.select(size(col("SortedDataSet"))).first().getInt(0)

val df2 = (1 to arrSize).foldLeft(df)(
    (d, i) => 
    d.withColumn(
        s"Col$i", 
        sort_array(col("SortedDataSet"))(i-1)
    )
)

val colNames = df.selectExpr("SortedDataSet[0] as tmp").select("tmp.*").columns
// colNames: Array[String] = Array(name, author, pages)

val colList = (1 to arrSize).map("Col" + _ + ".*").toSeq
// colList: scala.collection.immutable.Seq[String] = Vector(Col1.*, Col2.*)

val colRename = df2.columns ++ (
    for {x <- (1 to arrSize); y <- colNames} 
    yield (x,y)
).map(
    x => x._2 + "_" + x._1
).toArray[String]
// colRename: Array[String] = Array(name, SortedDataSet, Col1, Col2, name_1, author_1, pages_1, name_2, author_2, pages_2)

val newDf = df2.select("*", colList: _*).toDF(colRename: _*)

newDf.show(false)
+-------+-----------------------------------+---------------+----------------+------+--------+-------+------+--------+-------+
|name   |SortedDataSet                      |Col1           |Col2            |name_1|author_1|pages_1|name_2|author_2|pages_2|
+-------+-----------------------------------+---------------+----------------+------+--------+-------+------+--------+-------+
|ABC1   |[[Java, XX, 120], [Scala, XA, 300]]|[Java, XX, 120]|[Scala, XA, 300]|Java  |XX      |120    |Scala |XA      |300    |
|Michael|[[Java, XY, 200], [Scala, XB, 500]]|[Java, XY, 200]|[Scala, XB, 500]|Java  |XY      |200    |Scala |XB      |500    |
|Robert |[[Java, XZ, 400], [Scala, XC, 250]]|[Java, XZ, 400]|[Scala, XC, 250]|Java  |XZ      |400    |Scala |XC      |250    |
+-------+-----------------------------------+---------------+----------------+------+--------+-------+------+--------+-------+
x0fgdtte

x0fgdtte2#

一种方法是使用 posexplode ,然后是 groupBy 以及 pivot 在生成的索引上,如下所示:
给定示例数据集:

df.show(false)
// +-------+--------------------------------------------------+
// |name   |SortedDataSet                                     |
// +-------+--------------------------------------------------+
// |ABC1   |[[Java, XX, 120], [Scala, XA, 300]]               |
// |Michael|[[Java, XY, 200], [Scala, XB, 500], [Go, XD, 600]]|
// |Robert |[[Java, XZ, 400], [Scala, XC, 250]]               |
// +-------+--------------------------------------------------+

注意,我稍微概括了示例数据,以展示大小不均匀的数组。

val flattenedDF = df.
  select($"name", posexplode($"SortedDataSet")).
  groupBy($"name").pivot($"pos" + 1).agg(
    first($"col.name").as("name"),
    first($"col.author").as("author"),
    first($"col.pages").as("pages")
  )

flattenedDF.show
// +-------+------+--------+-------+------+--------+-------+------+--------+-------+
// |   name|1_name|1_author|1_pages|2_name|2_author|2_pages|3_name|3_author|3_pages|
// +-------+------+--------+-------+------+--------+-------+------+--------+-------+
// |   ABC1|  Java|      XX|    120| Scala|      XA|    300|  null|    null|   null|
// |Michael|  Java|      XY|    200| Scala|      XB|    500|    Go|      XD|    600|
// | Robert|  Java|      XZ|    400| Scala|      XC|    250|  null|    null|   null|
// +-------+------+--------+-------+------+--------+-------+------+--------+-------+

修改由创建的列名的步骤 pivot 通缉名单:

val pattern = "^\\d+_.*"
val flattenedCols = flattenedDF.columns.filter(_ matches pattern)

def colRenamed(c: String): String =
  c.split("_", 2).reverse.mkString("_")  // Split on first "_" and switch segments

flattenedDF.
  select($"name" +: flattenedCols.map(c => col(c).as(colRenamed(c))): _*).
  show
// +-------+------+--------+-------+------+--------+-------+------+--------+-------+
// |   name|name_1|author_1|pages_1|name_2|author_2|pages_2|name_3|author_3|pages_3|
// +-------+------+--------+-------+------+--------+-------+------+--------+-------+
// |   ABC1|  Java|      XX|    120| Scala|      XA|    300|  null|    null|   null|
// |Michael|  Java|      XY|    200| Scala|      XB|    500|    Go|      XD|    600|
// | Robert|  Java|      XZ|    400| Scala|      XC|    250|  null|    null|   null|
// +-------+------+--------+-------+------+--------+-------+------+--------+-------+

相关问题