scala spark:展平键/值结构的数组

sdnqo3pr  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(378)

我有一个包含数组类型列的输入Dataframe。数组中的每个条目都是一个结构,由一个键(大约四个值中的一个)和一个值组成。我想把它转换成一个Dataframe,每个可能的键对应一列,如果该值不在该行的数组中,则为空。任何数组中的键都不会重复,但它们可能会无序或丢失。
到目前为止,我得到的最好的是

  1. val wantedCols =df.columns
  2. .filter(_ != arrayCol)
  3. .filter(_ != "col")
  4. val flattened = df
  5. .select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
  6. .groupBy(wantedCols.map(col(_)):_*)
  7. .pivot("col.key")
  8. .agg(first("col.value"))

这正是我想要的,但它是可怕的,我不知道什么样的分支分组,每一列,但一个。正确的方法是什么?
编辑:输入/输出示例:

  1. case class testStruct(name : String, number : String)
  2. val dfExampleInput = Seq(
  3. (0, "KY", Seq(testStruct("A", "45"))),
  4. (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
  5. .toDF("index", "state", "entries")
  6. .show
  7. +-----+-----+------------------+
  8. |index|state| entries|
  9. +-----+-----+------------------+
  10. | 0| KY| [[A, 45]]|
  11. | 1| OR|[[A, 30], [B, 10]]|
  12. +-----+-----+------------------+
  13. val dfExampleOutput = Seq(
  14. (0, "KY", "45", null),
  15. (1, "OR", "30", "10"))
  16. .toDF("index", "state", "A", "B")
  17. .show
  18. +-----+-----+---+----+
  19. |index|state| A| B|
  20. +-----+-----+---+----+
  21. | 0| KY| 45|null|
  22. | 1| OR| 30| 10|
  23. +-----+-----+---+----+

进一步编辑:
我自己提交了一个解决方案(见下文),只要您事先知道密钥(在我的例子中,我知道),就可以很好地处理这个问题。如果找到密钥是一个问题,另一个答案包含处理该问题的代码。

66bbxpm5

66bbxpm51#

没有
groupBy pivot agg first 请检查下面的代码。

  1. scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>"))
  2. df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field]
  3. scala> df.printSchema
  4. root
  5. |-- index: integer (nullable = false)
  6. |-- state: string (nullable = true)
  7. |-- entries: array (nullable = true)
  8. | |-- element: struct (containsNull = true)
  9. | | |-- name: string (nullable = true)
  10. | | |-- number: string (nullable = true)
  11. scala> df.show(false)
  12. +-----+-----+------------------+
  13. |index|state|entries |
  14. +-----+-----+------------------+
  15. |0 |KY |[[A, 45]] |
  16. |1 |OR |[[A, 30], [B, 10]]|
  17. +-----+-----+------------------+
  18. scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns
  19. finalDFColumns: Array[String] = Array(index, state, entries, A, B)
  20. scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*)
  21. finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields]
  22. scala> finalDF.show(false)
  23. +-----+-----+------------------+---+----+
  24. |index|state|entries |0 |1 |
  25. +-----+-----+------------------+---+----+
  26. |0 |KY |[[A, 45]] |45 |null|
  27. |1 |OR |[[A, 30], [B, 10]]|30 |10 |
  28. +-----+-----+------------------+---+----+
  29. scala> finalDF.printSchema
  30. root
  31. |-- index: integer (nullable = false)
  32. |-- state: string (nullable = true)
  33. |-- entries: array (nullable = true)
  34. | |-- element: struct (containsNull = true)
  35. | | |-- name: string (nullable = true)
  36. | | |-- number: string (nullable = true)
  37. |-- 0: string (nullable = true)
  38. |-- 1: string (nullable = true)
  39. scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false)
  40. +-----+-----+------------------+---+----+
  41. |index|state|entries |A |B |
  42. +-----+-----+------------------+---+----+
  43. |0 |KY |[[A, 45]] |45 |null|
  44. |1 |OR |[[A, 30], [B, 10]]|30 |10 |
  45. +-----+-----+------------------+---+----+
  46. scala>

最终输出

  1. scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).drop($"entries").show(false)
  2. +-----+-----+---+----+
  3. |index|state|A |B |
  4. +-----+-----+---+----+
  5. |0 |KY |45 |null|
  6. |1 |OR |30 |10 |
  7. +-----+-----+---+----+
展开查看全部
qncylg1j

qncylg1j2#

我自己想出了一个解决办法:

  1. def extractFromArray(colName : String, key : String, numKeys : Int, keyName : String) = {
  2. val indexCols = (0 to numKeys-1).map(col(colName).getItem(_))
  3. indexCols.foldLeft(lit(null))((innerCol : Column, indexCol : Column) =>
  4. when(indexCol.isNotNull && (indexCol.getItem(keyName) === key), indexCol)
  5. .otherwise(innerCol))
  6. }

例子:

  1. case class testStruct(name : String, number : String)
  2. val df = Seq(
  3. (0, "KY", Seq(testStruct("A", "45"))),
  4. (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  5. (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  6. (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
  7. )
  8. .toDF("index", "state", "entries")
  9. .withColumn("A", extractFromArray("entries", "B", 3, "name"))
  10. .show

产生:

  1. +-----+-----+--------------------+-------+
  2. |index|state| entries| A|
  3. +-----+-----+--------------------+-------+
  4. | 0| KY| [[A, 45]]| null|
  5. | 1| OR| [[A, 30], [B, 10]]|[B, 10]|
  6. | 2| FL|[[A, 30], [B, 10]...|[B, 10]|
  7. | 3| TX|[[B, 60], [A, 19]...|[B, 60]|
  8. +-----+-----+--------------------+-------+

此解决方案与其他答案稍有不同:
它一次只能对一个键起作用
它要求预先知道密钥名称和密钥数
它生成一列结构,而不是执行提取特定值的额外步骤
它作为一个简单的列到列操作工作,而不需要对整个df进行转换
它可以懒散地评估
前三个问题可以通过调用代码来处理,对于您已经知道键或者结构包含要提取的附加值的情况,可以让代码更加灵活。

展开查看全部
hfyxw5xn

hfyxw5xn3#

我不会太担心按几个列分组,只会让事情变得混乱。在这种情况下,如果有更简单、更易维护的方法,那就去做吧。如果没有示例输入/输出,我不确定这是否能让您达到您想要达到的目的,但也许它会有用:

  1. Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description
  2. .select(explode($"value")) // flatten the array
  3. .select("col.*") // de-nest the struct
  4. .groupBy("_2") // one row per distinct value
  5. .pivot("_1") // one column per distinct key
  6. .count // or agg(first) if you want the value in each column
  7. .show
  8. +---+----+----+
  9. | _2| k1| k2|
  10. +---+----+----+
  11. | v2|null| 1|
  12. | v1| 1|null|
  13. +---+----+----+

根据您现在所说的内容,我得到的印象是,有许多类似“state”的列不是聚合所必需的,但需要在最终结果中。
作为参考,如果不需要透视,可以添加一个嵌套了所有此类字段的结构列,然后将其添加到聚合中,例如: .agg(first($"myStruct"), first($"number")) . 其主要优点是只在中引用实际的键列 groubBy . 但是当使用pivot时,事情变得有点奇怪,所以我们将把这个选项放在一边。
在这个用例中,我能想到的最简单的方法是拆分Dataframe,并在聚合之后使用一些rowkey将其重新连接在一起。在这个例子中,我假设 "index" 适用于此目的:

  1. val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col)
  2. val mehDF = dfExampleInput.select(mehCols:_*)
  3. val aggDF = dfExampleInput
  4. .select($"index", explode($"entries").as("entry"))
  5. .select($"index", $"entry.*")
  6. .groupBy("index")
  7. .pivot("name")
  8. .agg(first($"number"))
  9. scala> mehDF.join(aggDF, Seq("index")).show
  10. +-----+-----+---+----+
  11. |index|state| A| B|
  12. +-----+-----+---+----+
  13. | 0| KY| 45|null|
  14. | 1| OR| 30| 10|
  15. +-----+-----+---+----+

如果有的话,我怀疑你在表现上会看到很大的不同。可能在极端情况下,例如:非常多 meh 列,或者很多透视列,或者类似的,或者什么都没有。就我个人而言,我会用适当大小的输入来测试这两种方法,如果没有显著的差异,就用更容易维护的方法。

展开查看全部
vfhzx4xs

vfhzx4xs4#

这是另一种方法,它是建立在假设的基础上,没有重复的 entries 列,即 Seq(testStruct("A", "30"), testStruct("A", "70"), testStruct("B", "10")) 会导致错误。下一个解决方案结合了rdd和dataframe API来实现:

  1. import org.apache.spark.sql.functions.explode
  2. import org.apache.spark.sql.types.StructType
  3. case class testStruct(name : String, number : String)
  4. val df = Seq(
  5. (0, "KY", Seq(testStruct("A", "45"))),
  6. (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  7. (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  8. (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
  9. )
  10. .toDF("index", "state", "entries")
  11. .cache
  12. // get all possible keys from entries i.e Seq[A, B, C]
  13. val finalCols = df.select(explode($"entries").as("entry"))
  14. .select($"entry".getField("name").as("entry_name"))
  15. .distinct
  16. .collect
  17. .map{_.getAs[String]("entry_name")}
  18. .sorted // Attention: we need to retain the order of the columns
  19. // 1. when generating row values and
  20. // 2. when creating the schema
  21. val rdd = df.rdd.map{ r =>
  22. // transform the entries array into a map i.e Map(A -> 30, B -> 10)
  23. val entriesMap = r.getSeq[Row](2).map{r => (r.getString(0), r.getString(1))}.toMap
  24. // transform finalCols into a map with null value i.e Map(A -> null, B -> null, C -> null)
  25. val finalColsMap = finalCols.map{c => (c, null)}.toMap
  26. // replace null values with those that are present from the current row by merging the two previous maps
  27. // Attention: this should retain the order of finalColsMap
  28. val merged = finalColsMap ++ entriesMap
  29. // concatenate the two first row values ["index", "state"] with the values from merged
  30. val finalValues = Seq(r(0), r(1)) ++ merged.values
  31. Row.fromSeq(finalValues)
  32. }
  33. val extraCols = finalCols.map{c => s"`${c}` STRING"}
  34. val schema = StructType.fromDDL("`index` INT, `state` STRING," + extraCols.mkString(","))
  35. val finalDf = spark.createDataFrame(rdd, schema)
  36. finalDf.show
  37. // +-----+-----+---+----+----+
  38. // |index|state| A| B| C|
  39. // +-----+-----+---+----+----+
  40. // | 0| KY| 45|null|null|
  41. // | 1| OR| 30| 10|null|
  42. // | 2| FL| 30| 10| 20|
  43. // | 3| TX| 19| 60| 40|
  44. // +-----+-----+---+----+----+

注意:该解决方案需要一个额外的操作来检索唯一键,尽管它不会导致任何洗牌,因为它只基于窄变换。

展开查看全部

相关问题