scala—以编程方式从struct column中提取列作为单独的列

polhcujo  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(273)

我有一个Dataframe如下

  1. val initialData = Seq(
  2. Row("ABC1",List(Row("Java","XX",120),Row("Scala","XA",300))),
  3. Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
  4. Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250)))
  5. )
  6. val arrayStructSchema = new StructType().add("name",StringType)
  7. .add("SortedDataSet",ArrayType(new StructType()
  8. .add("name",StringType)
  9. .add("author",StringType)
  10. .add("pages",IntegerType)))
  11. val df = spark
  12. .createDataFrame(spark.sparkContext.parallelize(initialData),arrayStructSchema)
  13. df.printSchema()
  14. df.show(5, false)
  15. +-------+-----------------------------------+
  16. |name |SortedDataSet |
  17. +-------+-----------------------------------+
  18. |ABC1 |[[Java, XX, 120], [Scala, XA, 300]]|
  19. |Michael|[[Java, XY, 200], [Scala, XB, 500]]|
  20. |Robert |[[Java, XZ, 400], [Scala, XC, 250]]|
  21. +-------+-----------------------------------+

我现在需要将结构的每个元素提取为一个单独的索引列,我将执行以下操作

  1. val newDf = df.withColumn("Col1", sort_array('SortedDataSet).getItem(0))
  2. .withColumn("Col2", sort_array('SortedDataSet).getItem(1))
  3. .withColumn("name_1",$"Col1.name")
  4. .withColumn("author_1",$"Col1.author")
  5. .withColumn("pages_1",$"Col1.pages")
  6. .withColumn("name_2",$"Col2.name")
  7. .withColumn("author_2",$"Col2.author")
  8. .withColumn("pages_2",$"Col2.pages")

这很简单,因为我只有2个数组和5列。当我有多个数组和列时该怎么办?
如何以编程方式执行此操作?

0tdrvxhp

0tdrvxhp1#

如果数组的大小相同,则可以通过动态选择数组和结构元素来避免执行昂贵的分解、分组和透视:

  1. val arrSize = df.select(size(col("SortedDataSet"))).first().getInt(0)
  2. val df2 = (1 to arrSize).foldLeft(df)(
  3. (d, i) =>
  4. d.withColumn(
  5. s"Col$i",
  6. sort_array(col("SortedDataSet"))(i-1)
  7. )
  8. )
  9. val colNames = df.selectExpr("SortedDataSet[0] as tmp").select("tmp.*").columns
  10. // colNames: Array[String] = Array(name, author, pages)
  11. val colList = (1 to arrSize).map("Col" + _ + ".*").toSeq
  12. // colList: scala.collection.immutable.Seq[String] = Vector(Col1.*, Col2.*)
  13. val colRename = df2.columns ++ (
  14. for {x <- (1 to arrSize); y <- colNames}
  15. yield (x,y)
  16. ).map(
  17. x => x._2 + "_" + x._1
  18. ).toArray[String]
  19. // colRename: Array[String] = Array(name, SortedDataSet, Col1, Col2, name_1, author_1, pages_1, name_2, author_2, pages_2)
  20. val newDf = df2.select("*", colList: _*).toDF(colRename: _*)
  21. newDf.show(false)
  22. +-------+-----------------------------------+---------------+----------------+------+--------+-------+------+--------+-------+
  23. |name |SortedDataSet |Col1 |Col2 |name_1|author_1|pages_1|name_2|author_2|pages_2|
  24. +-------+-----------------------------------+---------------+----------------+------+--------+-------+------+--------+-------+
  25. |ABC1 |[[Java, XX, 120], [Scala, XA, 300]]|[Java, XX, 120]|[Scala, XA, 300]|Java |XX |120 |Scala |XA |300 |
  26. |Michael|[[Java, XY, 200], [Scala, XB, 500]]|[Java, XY, 200]|[Scala, XB, 500]|Java |XY |200 |Scala |XB |500 |
  27. |Robert |[[Java, XZ, 400], [Scala, XC, 250]]|[Java, XZ, 400]|[Scala, XC, 250]|Java |XZ |400 |Scala |XC |250 |
  28. +-------+-----------------------------------+---------------+----------------+------+--------+-------+------+--------+-------+
展开查看全部
x0fgdtte

x0fgdtte2#

一种方法是使用 posexplode ,然后是 groupBy 以及 pivot 在生成的索引上,如下所示:
给定示例数据集:

  1. df.show(false)
  2. // +-------+--------------------------------------------------+
  3. // |name |SortedDataSet |
  4. // +-------+--------------------------------------------------+
  5. // |ABC1 |[[Java, XX, 120], [Scala, XA, 300]] |
  6. // |Michael|[[Java, XY, 200], [Scala, XB, 500], [Go, XD, 600]]|
  7. // |Robert |[[Java, XZ, 400], [Scala, XC, 250]] |
  8. // +-------+--------------------------------------------------+

注意,我稍微概括了示例数据,以展示大小不均匀的数组。

  1. val flattenedDF = df.
  2. select($"name", posexplode($"SortedDataSet")).
  3. groupBy($"name").pivot($"pos" + 1).agg(
  4. first($"col.name").as("name"),
  5. first($"col.author").as("author"),
  6. first($"col.pages").as("pages")
  7. )
  8. flattenedDF.show
  9. // +-------+------+--------+-------+------+--------+-------+------+--------+-------+
  10. // | name|1_name|1_author|1_pages|2_name|2_author|2_pages|3_name|3_author|3_pages|
  11. // +-------+------+--------+-------+------+--------+-------+------+--------+-------+
  12. // | ABC1| Java| XX| 120| Scala| XA| 300| null| null| null|
  13. // |Michael| Java| XY| 200| Scala| XB| 500| Go| XD| 600|
  14. // | Robert| Java| XZ| 400| Scala| XC| 250| null| null| null|
  15. // +-------+------+--------+-------+------+--------+-------+------+--------+-------+

修改由创建的列名的步骤 pivot 通缉名单:

  1. val pattern = "^\\d+_.*"
  2. val flattenedCols = flattenedDF.columns.filter(_ matches pattern)
  3. def colRenamed(c: String): String =
  4. c.split("_", 2).reverse.mkString("_") // Split on first "_" and switch segments
  5. flattenedDF.
  6. select($"name" +: flattenedCols.map(c => col(c).as(colRenamed(c))): _*).
  7. show
  8. // +-------+------+--------+-------+------+--------+-------+------+--------+-------+
  9. // | name|name_1|author_1|pages_1|name_2|author_2|pages_2|name_3|author_3|pages_3|
  10. // +-------+------+--------+-------+------+--------+-------+------+--------+-------+
  11. // | ABC1| Java| XX| 120| Scala| XA| 300| null| null| null|
  12. // |Michael| Java| XY| 200| Scala| XB| 500| Go| XD| 600|
  13. // | Robert| Java| XZ| 400| Scala| XC| 250| null| null| null|
  14. // +-------+------+--------+-------+------+--------+-------+------+--------+-------+
展开查看全部

相关问题