我是“分组”一个基于两个键的Dataframe, key1
, key2
. 然后我 collect_list
另一列 value
它返回一个嵌套列表,因为该列已经是 Seq
(如果该列为 Set
).
我需要把嵌套列表展平。
import org.apache.spark.sql.functions._
import spark.implicits._
case class Record(key1: String, key2: String, values: Seq[String])
val ds: Dataset[Record] = spark.createDataset(Seq(
Record("abc", "bca", Seq("one", "two", "three")),
Record("abc", "bca", Seq("three", "two", "one")),
Record("xyz", "xyz", Seq("four", "five", "six"))
))
ds.show(false)
/*
+----+----+-----------------+
|key1|key2|values |
+----+----+-----------------+
|abc |bca |[one, two, three]|
|abc |bca |[three, two, one]|
|xyz |xyz |[four, five, six]|
+----+----+-----------------+
* /
val firstDf: DataFrame = ds.groupBy($"key1", $"key2").agg(collect_list($"values").as("values"))
firstDf.show(false)
/* Column "value" is a nested list.
+----+----+--------------------------------------+
|key1|key2|values |
+----+----+--------------------------------------+
|xyz |xyz |[[four, five, six]] |
|abc |bca |[[one, two, three], [three, two, one]]|
+----+----+--------------------------------------+
/*
firstDf.printSchema()
root
|-- key1: string (nullable = true)
|-- key2: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
期望结果
允许重复值(列表语义)。但是在其他一些情况下,我不需要重复的条目(集合语义),所以让我们将答案设为泛型。
+----+----+----------------------------------+
|key1|key2|values |
+----+----+----------------------------------+
|xyz |xyz |[four, five, six] |
|abc |bca |[one, two, three, three, two, one]|
+----+----+----------------------------------+
Or alternatively:
+----+----+-----------------+
|key1|key2|values |
+----+----+-----------------+
|xyz |xyz |[four, five, six]|
|abc |bca |[one, two, three]|
+----+----+-----------------+
事实上,我正在寻找这样的解决方案:在列?中展平列表列表?。
我已经试过了 firstDf.select("values.*")
根据其他一些帖子,但我得到: org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(values);
事先谢谢你的帮助。
3条答案
按热度按时间lo8azlld1#
你要找的函数被完全调用了
flatten
:如果只需要不同的项,可以使用
array_distinct
:对于较旧的spark版本(<2.4),可以使用udf进行展平:
3qpi33ja2#
如果您不能使用RDDAPI,则可以使用另一种方法
flatten
以及array_distinct
功能:c7rzv4ha3#
这是另一种使用
explode
(如果您的数据太大,这可能会有问题)并避免flatten
等:若要获取不同的值,请替换
collect_list
与collect_set
.代码是pyspark,但是类似的东西也可以在scala中使用。