spark:groupby和collect\u list,同时按另一列进行筛选

unftdfkk  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(344)

我有以下Dataframe

+-----+-----+------+
|group|label|active|
+-----+-----+------+
|    a|    1|     y|
|    a|    2|     y|
|    a|    1|     n|
|    b|    1|     y|
|    b|    1|     n|
+-----+-----+------+

我想按“group”列分组,按“label”列收集,同时过滤active列中的值。
预期结果是

+-----+---------+---------+----------+
|group| labelyes| labelno |difference|
+-----+---------+---------+----------+
|a    | [1,2]   | [1]     | [2]      |
|b    | [1]     | [1]     | []       |
+-----+---------+---------+----------+

我可以很容易地得到过滤器的“y”标签

val dfyes = df.filter($"active" === "y").groupBy("group").agg(collect_set("label"))

对于“n”值也是如此

val dfno = df.filter($"active" === "n").groupBy("group").agg(collect_set("label"))

但我不明白在过滤时是否可以同时聚合,以及如何得到这两个集合的差异。

bwntbbo3

bwntbbo31#

谢谢@mck的帮助。我找到了另一种解决问题的方法,那就是用 when 聚合期间:

df
   .groupBy("group")
   .agg(
        collect_set(when($"active" === "y", $"label")).as("labelyes"), 
        collect_set(when($"active" === "n", $"label")).as("labelno")
       )
.withColumn("diff", array_except($"labelyes", $"labelno"))
m4pnthwp

m4pnthwp2#

您可以进行透视,并使用一些数组函数来获得差异:

val df2 = df.groupBy("group").pivot("active").agg(collect_list("label")).withColumn(
    "difference", 
    array_union(
        array_except(col("n"), col("y")), 
        array_except(col("y"), col("n"))
    )
)

df2.show
+-----+---+------+----------+
|group|  n|     y|difference|
+-----+---+------+----------+
|    b|[1]|   [1]|        []|
|    a|[1]|[1, 2]|       [2]|
+-----+---+------+----------+

相关问题