提高distinct+groupbykey在spark上的性能

von4xj4u  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(600)

我试图学习Spark,并提出了这个问题,但我的解决方案似乎没有表现得很好。我希望有人能教我如何提高表现。我的问题如下。
我有几百万个元组(例如(a,b),(a,c),(b,c)等),可能有重复的元组(键和值)。我想做的是按键对元组进行分组,为了更有趣,将分组值的长度限制为任意数字(比如3)。
例如,如果我有:

[(A, B), (A, C), (A, D), (A, E), (B, C)]

我希望结果是:

[(A, [B, C, D]), (A, [E]), (B, [C]))

如果列表中的任何一个值超过3,那么它会将其拆分,并使用(a,[e])多次列出同一个键。希望这是有意义的。
我想出的解决办法是:

val myTuples: Array[(String, String)] = ...
sparkContext.parallelize(myTuples)
            .distinct()             // to delete duplicates
            .groupByKey()           // to group up the tuples by key
            .flatMapValues(values => values.grouped(3)) // split up values in groups of 3
            .repartition(sparkContext.defaultParallelism)
            .collect()

我的解决方案还可以,但是有没有更有效的方法呢?我听说groupbykey效率很低。任何帮助都将不胜感激。
还有,我应该为分区选择一个好的数字吗?我注意到了 distinct 接受 partition 但不确定我应该放什么。
谢谢!

wfsdck30

wfsdck301#

您需要稍微重新表述您的问题,因为您实际上并不是按一个键来分组的;在上面的示例中,您为“a”输出了多行。在下面,我添加了一个列,我们还可以使用它来分组(它将每3条记录递增一次),并收集\u list,这是一个sparksql函数,用于生成您要查找的数组。请注意,通过完全使用sparksql,您可以从spark获得许多优化(通过“catalyst”,这是一个查询优化程序)。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val data = List(("A", "B"), ("A", "C"), ("A", "D"), ("A", "E"), ("B", "C")).toDF("key","value")

val data2 = data.withColumn("index", floor(
  (row_number().over(Window.partitionBy("key").orderBy("value"))-1)/3)
)
data2.show

+---+-----+-----+
|key|value|index|
+---+-----+-----+
|  B|    C|    0|
|  A|    B|    0|
|  A|    C|    0|
|  A|    D|    0|
|  A|    E|    1|
+---+-----+-----+

data2.groupBy("key","index").agg(collect_list("value")).show

+---+-----+-------------------+
|key|index|collect_list(value)|
+---+-----+-------------------+
|  B|    0|                [C]|
|  A|    0|          [B, C, D]|
|  A|    1|                [E]|
+---+-----+-------------------+

相关问题