合并包含重复值的Dataframe的n行

zfycwa2u  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(476)

我有一个如下的Dataframe

Id    linkedIn
1     [l1,l2]
2     [l5,l6,l3]
3     [l4,l5]
4     [l8,l10]
5     [l7,l9,l1]

如果我们看到第1行和第5行有共同的l1,那么这两行应该合并为id=1的一行。类似地,第2行和第3行有一个共同的l5,所以这两行应该合并为一行,id=2,第4行应该保持不变,因为它没有与其他行重复的内容。
我希望输出如下

Id    linkedIn
1     [l1,l2,l7,l9]
2     [l4,l5,l6,l3]
4     [l8,l10]

我用的是spark 2.3

5cnsuln7

5cnsuln71#

对于2行通用性,您可以尝试下面的代码。

val df = Seq(
  (1,Seq("l1","l2")),
  (2,Seq("l5","l6","l3")),
  (3,Seq("l4","l5")),
  (4,Seq("l8","l10")),
  (5,Seq("l7","l9","l1")),
  (6,Seq("l20","l10","l1"))
).toDF("id","values")

val df2 = df.select('id,explode('values).as("value"))
val df3 = df2.join(df2,"value").toDF("value","id","id2")
val df4 = df3.groupBy('id).agg(hash(collect_set('id2)).as("hash"))
val df5 = df2.join(df4,"id").groupBy('hash).agg(collect_set('value).as("values"))
val df6 = df5.join(df4.groupBy('hash).agg(min('id).as("id")),"hash").select('id,'values).orderBy('id)
df6.show()

输出:

+---+----------------+
| id|          values|
+---+----------------+
|  1|[l7, l9, l2, l1]|
|  2|[l4, l3, l6, l5]|
|  4|       [l8, l10]|
+---+----------------+
3wabscal

3wabscal2#

另一种方法,虽然我也喜欢上面的方法,但还没有测试过,这个解决方案考虑了性能,添加了我自己的数据:

import spark.implicits._
import org.apache.spark.sql.functions._

val distinctUDF = udf( (s: Seq[String]) => s.distinct ) // courtesy of LeoC

val df = Seq( (1, Array("l1", "l2", "l700")),
          (2, Array("l5", "l6", "l3")),
          (3, Array("l4", "l5")),
          (4, Array("l8", "l10")),
          (5, Array("l7", "l8", "l1", "l700")) ).toDF("k", "lv")

val df2 = df.withColumn("lv", explode($"lv")).repartition($"lv") // 200 partitions default

//collect_set() contains distinct elements and collect_list() contains all elements (except nulls
val df3 = df2.groupBy("lv").agg(collect_list("k").as("kv"))
val df4 = df3.filter(size($"kv") > 1).select("kv").distinct
val df5 = df4.withColumn("j", explode($"kv"))
val df6 = df5.join(df, (df5("j") === df("k"))) 
val df7 = df6.groupBy("kv").agg(collect_set("lv").as("lv"))

df7.withColumn("key", array_min($"kv")).withColumn("values", distinctUDF(flatten($"lv"))).select("key", "values").show(false) 
// You can order output as you wish and fusing of lazy eval code takes place

结果(对于这组数据):

+---+-----------------------+
|key|values                 |
+---+-----------------------+
|2  |[l4, l5, l6, l3]       |
|1  |[l1, l2, l700, l7, l8] |
|4  |[l8, l10, l7, l1, l700]|
+---+-----------------------+

相关问题