spark窗口聚合与group by/join性能之比较

2lpgd968  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(1676)

与groupby/join相比,我对在窗口上运行聚合函数的性能特征感兴趣。在本例中,我对具有自定义帧边界或顺序的窗口函数不感兴趣,而只是作为运行聚合函数的一种方式。
请注意,我只对大小适中的数据量的批处理(非流式)性能感兴趣,因此我禁用了以下的广播连接。
例如,假设我们从以下Dataframe开始:

  1. val df = Seq(("bob", 10), ("sally", 32), ("mike", 9), ("bob", 18)).toDF("name", "age")
  2. df.show(false)
  3. +-----+---+
  4. |name |age|
  5. +-----+---+
  6. |bob |10 |
  7. |sally|32 |
  8. |mike |9 |
  9. |bob |18 |
  10. +-----+---+

假设我们要计算每个名称出现的次数,然后对具有匹配名称的行提供该计数。

分组依据/加入

  1. val joinResult = df.join(
  2. df.groupBy($"name").count,
  3. Seq("name"),
  4. "inner"
  5. )
  6. joinResult.show(false)
  7. +-----+---+-----+
  8. |name |age|count|
  9. +-----+---+-----+
  10. |sally|32 |1 |
  11. |mike |9 |1 |
  12. |bob |18 |2 |
  13. |bob |10 |2 |
  14. +-----+---+-----+
  15. joinResult.explain
  16. == Physical Plan ==
  17. * (4) Project [name#5, age#6, count#12L]
  18. +- *(4) SortMergeJoin [name#5], [name#15], Inner
  19. :- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
  20. : +- Exchange hashpartitioning(name#5, 200)
  21. : +- LocalTableScan [name#5, age#6]
  22. +- *(3) Sort [name#15 ASC NULLS FIRST], false, 0
  23. +- *(3) HashAggregate(keys=[name#15], functions=[count(1)])
  24. +- Exchange hashpartitioning(name#15, 200)
  25. +- *(2) HashAggregate(keys=[name#15], functions=[partial_count(1)])
  26. +- LocalTableScan [name#15]

窗口

  1. import org.apache.spark.sql.expressions.Window
  2. import org.apache.spark.sql.{functions => f}
  3. val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
  4. windowResult.show(false)
  5. +-----+---+-----+
  6. |name |age|count|
  7. +-----+---+-----+
  8. |sally|32 |1 |
  9. |mike |9 |1 |
  10. |bob |10 |2 |
  11. |bob |18 |2 |
  12. +-----+---+-----+
  13. windowResult.explain
  14. == Physical Plan ==
  15. Window [count(1) windowspecdefinition(name#5, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count#34L], [name#5]
  16. +- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
  17. +- Exchange hashpartitioning(name#5, 200)
  18. +- LocalTableScan [name#5, age#6]

根据执行计划,窗口化看起来更有效(阶段更少)。所以我的问题是,是否总是这样——我应该总是使用窗口函数来进行这种聚合吗?随着数据的增长,这两种方法的扩展是否相似?极端偏斜(即有些名字比其他名字更常见)怎么办?

fsi0uk1n

fsi0uk1n1#

这取决于数据。更具体地说,这取决于 name 列。如果基数很小,则聚合后的数据将很小,聚合结果可以在联接中广播。在这种情况下,连接将比 window . 另一方面,如果基数大,聚合后的数据量大,那么连接将被规划为 SortMergeJoin ,使用 window 会更有效率。
如果是 window 我们有一个总洗牌+一个排序。如果是 SortMergeJoin 我们在左分支中使用相同的方法(总洗牌+排序),在右分支中使用额外的简化洗牌和排序(通过简化,我的意思是首先聚合数据)。在连接的右侧分支中,我们还对数据进行了额外的扫描。
另外,你可以查看我在spark峰会上的视频,我在那里分析了类似的例子。

c0vxltue

c0vxltue2#

禁用广播,因为你的状态和生成一些数据与定时方法1米和2米的名字随机生成,又名体面的大小,计划2的执行时间似乎确实更好。databricks集群(社区)上的8、8200个分区大小。
生成的计划具有通过窗口排序和计数的智能&正如您所说的更少的阶段。这似乎是关键。在规模上,你可以有更多的分区,但证据驱使我接近2。
我尝试了随机的名字样本(忽略年龄),得到了以下结果:
加入48.361秒对22.028秒的窗口为1m记录为.count
在85.814秒的时间内加入,而在群集重新启动后,Windows for 2m记录的加入时间为50.566秒
加入时长为96.295秒,窗口时长为43.875秒,窗口时长为2m。count
使用的代码:

  1. import scala.collection.mutable.ListBuffer
  2. import scala.util.Random
  3. spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
  4. import org.apache.spark.sql.expressions.Window
  5. import org.apache.spark.sql.{functions => f}
  6. val alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
  7. val size = alpha.size
  8. def randStr(n:Int) = (1 to n).map(_ => alpha(Random.nextInt(size))).mkString
  9. def timeIt[T](op: => T): Float = {
  10. val start = System.currentTimeMillis
  11. val res = op
  12. val end = System.currentTimeMillis
  13. (end - start) / 1000f
  14. }
  15. var names = new ListBuffer[String]()
  16. for (i <- 1 to 2000000 ) {
  17. names += randStr(10)
  18. }
  19. val namesList = names.toSeq
  20. val df = namesList.toDF("name")
  21. val joinResult = df.join(df.groupBy($"name").count, Seq("name"), "inner")
  22. val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
  23. val time1 = timeIt(joinResult.count)
  24. val time2 = timeIt(windowResult.count)
  25. println(s"join in $time1 seconds vs $time2 seconds for window")

此外,这个问题还证明了spark优化器的不成熟性。

展开查看全部

相关问题