scala spark:展平键/值结构的数组

sdnqo3pr  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(379)

我有一个包含数组类型列的输入Dataframe。数组中的每个条目都是一个结构,由一个键(大约四个值中的一个)和一个值组成。我想把它转换成一个Dataframe,每个可能的键对应一列,如果该值不在该行的数组中,则为空。任何数组中的键都不会重复,但它们可能会无序或丢失。
到目前为止,我得到的最好的是

val wantedCols =df.columns
  .filter(_ != arrayCol)
  .filter(_ != "col")
val flattened = df
        .select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
        .groupBy(wantedCols.map(col(_)):_*)
        .pivot("col.key")
        .agg(first("col.value"))

这正是我想要的,但它是可怕的,我不知道什么样的分支分组,每一列,但一个。正确的方法是什么?
编辑:输入/输出示例:

case class testStruct(name : String, number : String)
val dfExampleInput = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
.toDF("index", "state", "entries")
.show

+-----+-----+------------------+
|index|state|           entries|
+-----+-----+------------------+
|    0|   KY|         [[A, 45]]|
|    1|   OR|[[A, 30], [B, 10]]|
+-----+-----+------------------+

val dfExampleOutput = Seq(
  (0, "KY", "45", null),
  (1, "OR", "30", "10"))
  .toDF("index", "state", "A", "B")
  .show

+-----+-----+---+----+
|index|state|  A|   B|
+-----+-----+---+----+
|    0|   KY| 45|null|
|    1|   OR| 30|  10|
+-----+-----+---+----+

进一步编辑:
我自己提交了一个解决方案(见下文),只要您事先知道密钥(在我的例子中,我知道),就可以很好地处理这个问题。如果找到密钥是一个问题,另一个答案包含处理该问题的代码。

66bbxpm5

66bbxpm51#

没有
groupBy pivot agg first 请检查下面的代码。

scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>"))
df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field]

scala> df.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)

scala> df.show(false)
+-----+-----+------------------+
|index|state|entries           |
+-----+-----+------------------+
|0    |KY   |[[A, 45]]         |
|1    |OR   |[[A, 30], [B, 10]]|
+-----+-----+------------------+

scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns
finalDFColumns: Array[String] = Array(index, state, entries, A, B)

scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*)
finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields]

scala> finalDF.show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |0  |1   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+

scala> finalDF.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)

scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |A  |B   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+

scala>

最终输出

scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).drop($"entries").show(false)
+-----+-----+---+----+
|index|state|A  |B   |
+-----+-----+---+----+
|0    |KY   |45 |null|
|1    |OR   |30 |10  |
+-----+-----+---+----+
qncylg1j

qncylg1j2#

我自己想出了一个解决办法:

def extractFromArray(colName : String, key : String, numKeys : Int, keyName : String) = {
  val indexCols = (0 to numKeys-1).map(col(colName).getItem(_))
  indexCols.foldLeft(lit(null))((innerCol : Column, indexCol : Column) =>
      when(indexCol.isNotNull && (indexCol.getItem(keyName) === key), indexCol)
      .otherwise(innerCol))
}

例子:

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.withColumn("A", extractFromArray("entries", "B", 3, "name"))
.show

产生:

+-----+-----+--------------------+-------+
|index|state|             entries|      A|
+-----+-----+--------------------+-------+
|    0|   KY|           [[A, 45]]|   null|
|    1|   OR|  [[A, 30], [B, 10]]|[B, 10]|
|    2|   FL|[[A, 30], [B, 10]...|[B, 10]|
|    3|   TX|[[B, 60], [A, 19]...|[B, 60]|
+-----+-----+--------------------+-------+

此解决方案与其他答案稍有不同:
它一次只能对一个键起作用
它要求预先知道密钥名称和密钥数
它生成一列结构,而不是执行提取特定值的额外步骤
它作为一个简单的列到列操作工作,而不需要对整个df进行转换
它可以懒散地评估
前三个问题可以通过调用代码来处理,对于您已经知道键或者结构包含要提取的附加值的情况,可以让代码更加灵活。

hfyxw5xn

hfyxw5xn3#

我不会太担心按几个列分组,只会让事情变得混乱。在这种情况下,如果有更简单、更易维护的方法,那就去做吧。如果没有示例输入/输出,我不确定这是否能让您达到您想要达到的目的,但也许它会有用:

Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description
  .select(explode($"value")) // flatten the array
  .select("col.*") // de-nest the struct
  .groupBy("_2") // one row per distinct value
  .pivot("_1") // one column per distinct key
  .count // or agg(first) if you want the value in each column
  .show
+---+----+----+
| _2|  k1|  k2|
+---+----+----+
| v2|null|   1|
| v1|   1|null|
+---+----+----+

根据您现在所说的内容,我得到的印象是,有许多类似“state”的列不是聚合所必需的,但需要在最终结果中。
作为参考,如果不需要透视,可以添加一个嵌套了所有此类字段的结构列,然后将其添加到聚合中,例如: .agg(first($"myStruct"), first($"number")) . 其主要优点是只在中引用实际的键列 groubBy . 但是当使用pivot时,事情变得有点奇怪,所以我们将把这个选项放在一边。
在这个用例中,我能想到的最简单的方法是拆分Dataframe,并在聚合之后使用一些rowkey将其重新连接在一起。在这个例子中,我假设 "index" 适用于此目的:

val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col)
 val mehDF = dfExampleInput.select(mehCols:_*)
 val aggDF = dfExampleInput
   .select($"index", explode($"entries").as("entry"))
   .select($"index", $"entry.*")
   .groupBy("index")
   .pivot("name")
   .agg(first($"number"))

 scala> mehDF.join(aggDF, Seq("index")).show
 +-----+-----+---+----+
 |index|state|  A|   B|
 +-----+-----+---+----+
 |    0|   KY| 45|null|
 |    1|   OR| 30|  10|
 +-----+-----+---+----+

如果有的话,我怀疑你在表现上会看到很大的不同。可能在极端情况下,例如:非常多 meh 列,或者很多透视列,或者类似的,或者什么都没有。就我个人而言,我会用适当大小的输入来测试这两种方法,如果没有显著的差异,就用更容易维护的方法。

vfhzx4xs

vfhzx4xs4#

这是另一种方法,它是建立在假设的基础上,没有重复的 entries 列,即 Seq(testStruct("A", "30"), testStruct("A", "70"), testStruct("B", "10")) 会导致错误。下一个解决方案结合了rdd和dataframe API来实现:

import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.types.StructType

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.cache

// get all possible keys from entries i.e Seq[A, B, C]
val finalCols = df.select(explode($"entries").as("entry"))
                  .select($"entry".getField("name").as("entry_name"))
                  .distinct
                  .collect
                  .map{_.getAs[String]("entry_name")}
                  .sorted // Attention: we need to retain the order of the columns 
                          // 1. when generating row values and
                          // 2. when creating the schema

val rdd = df.rdd.map{ r =>
  // transform the entries array into a map i.e Map(A -> 30, B -> 10)
  val entriesMap = r.getSeq[Row](2).map{r => (r.getString(0), r.getString(1))}.toMap

  // transform finalCols into a map with null value i.e Map(A -> null, B -> null, C -> null)
  val finalColsMap = finalCols.map{c => (c, null)}.toMap

  // replace null values with those that are present from the current row by merging the two previous maps
  // Attention: this should retain the order of finalColsMap
  val merged = finalColsMap ++ entriesMap

  // concatenate the two first row values ["index", "state"] with the values from merged
  val finalValues = Seq(r(0), r(1)) ++ merged.values

  Row.fromSeq(finalValues)
}

val extraCols = finalCols.map{c => s"`${c}` STRING"}
val schema = StructType.fromDDL("`index` INT, `state` STRING," + extraCols.mkString(","))

val finalDf = spark.createDataFrame(rdd, schema)

finalDf.show
// +-----+-----+---+----+----+
// |index|state|  A|   B|   C|
// +-----+-----+---+----+----+
// |    0|   KY| 45|null|null|
// |    1|   OR| 30|  10|null|
// |    2|   FL| 30|  10|  20|
// |    3|   TX| 19|  60|  40|
// +-----+-----+---+----+----+

注意:该解决方案需要一个额外的操作来检索唯一键,尽管它不会导致任何洗牌,因为它只基于窄变换。

相关问题