spark dataframe将嵌套列表或集合列表中的列展平

zzoitvuj  于 2021-07-13  发布在  Spark
关注(0)|答案(3)|浏览(377)

我是“分组”一个基于两个键的Dataframe, key1 , key2 . 然后我 collect_list 另一列 value 它返回一个嵌套列表,因为该列已经是 Seq (如果该列为 Set ).
我需要把嵌套列表展平。

import org.apache.spark.sql.functions._
import spark.implicits._
case class Record(key1: String, key2: String, values: Seq[String])
val ds: Dataset[Record] = spark.createDataset(Seq(
  Record("abc", "bca", Seq("one", "two", "three")),
  Record("abc", "bca", Seq("three", "two", "one")),
  Record("xyz", "xyz", Seq("four", "five", "six"))
))

ds.show(false)
/*
+----+----+-----------------+
|key1|key2|values           |
+----+----+-----------------+
|abc |bca |[one, two, three]|
|abc |bca |[three, two, one]|
|xyz |xyz |[four, five, six]|
+----+----+-----------------+

* /

val firstDf: DataFrame = ds.groupBy($"key1", $"key2").agg(collect_list($"values").as("values"))
firstDf.show(false)
/* Column "value" is a nested list.
+----+----+--------------------------------------+
|key1|key2|values                                |
+----+----+--------------------------------------+
|xyz |xyz |[[four, five, six]]                   |
|abc |bca |[[one, two, three], [three, two, one]]|
+----+----+--------------------------------------+
/*
firstDf.printSchema()
root
 |-- key1: string (nullable = true)
 |-- key2: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

期望结果
允许重复值(列表语义)。但是在其他一些情况下,我不需要重复的条目(集合语义),所以让我们将答案设为泛型。

+----+----+----------------------------------+
|key1|key2|values                            |
+----+----+----------------------------------+
|xyz |xyz |[four, five, six]                 |
|abc |bca |[one, two, three, three, two, one]|
+----+----+----------------------------------+
Or alternatively:
+----+----+-----------------+
|key1|key2|values           |
+----+----+-----------------+
|xyz |xyz |[four, five, six]|
|abc |bca |[one, two, three]|
+----+----+-----------------+

事实上,我正在寻找这样的解决方案:在列?中展平列表列表?。
我已经试过了 firstDf.select("values.*") 根据其他一些帖子,但我得到: org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(values); 事先谢谢你的帮助。

lo8azlld

lo8azlld1#

你要找的函数被完全调用了 flatten :

val firstDf = ds.groupBy($"key1", $"key2").agg(flatten(collect_list($"values")).as("values"))

firstDf.show(false)
+----+----+----------------------------------+
|key1|key2|values                            |
+----+----+----------------------------------+
|xyz |xyz |[four, five, six]                 |
|abc |bca |[one, two, three, three, two, one]|
+----+----+----------------------------------+

如果只需要不同的项,可以使用 array_distinct :

val firstDf = ds.groupBy($"key1", $"key2").agg(array_distinct(flatten(collect_list($"values"))).as("values"))

firstDf.show(false)
+----+----+-----------------+
|key1|key2|values           |
+----+----+-----------------+
|xyz |xyz |[four, five, six]|
|abc |bca |[one, two, three]|
+----+----+-----------------+

对于较旧的spark版本(<2.4),可以使用udf进行展平:

def flattenudf = udf((x: Seq[Seq[String]]) => x.flatten)
// if you want distinct elements,
// def flattenudf = udf((x: Seq[Seq[String]]) => x.flatten.distinct)
val firstDf = ds.groupBy($"key1", $"key2").agg(flattenudf(collect_list($"values")).as("values"))
3qpi33ja

3qpi33ja2#

如果您不能使用RDDAPI,则可以使用另一种方法 flatten 以及 array_distinct 功能:

val final_df = firstDF.rdd.map { 
  r => (r.getString(0), r.getString(1), r.getAs[Seq[Seq[String]]](2).flatten.distinct)
}.toDF("key1", "key2", "values")

final_df.show

//+----+----+-----------------+
//|key1|key2|           values|
//+----+----+-----------------+
//| xyz| xyz|[four, five, six]|
//| abc| bca|[one, two, three]|
//+----+----+-----------------+
c7rzv4ha

c7rzv4ha3#

这是另一种使用 explode (如果您的数据太大,这可能会有问题)并避免 flatten 等:

from pyspark.sql import functions as F

df = df.select('key1', 'key2', F.explode('values').alias('values'))
       .groupby('key1', 'key2')
       .agg(F.collect_list('values'))

若要获取不同的值,请替换 collect_listcollect_set .
代码是pyspark,但是类似的东西也可以在scala中使用。

相关问题