sparksql:为什么我在spark ui中看到3个作业而不是一个作业?

tf7tbtn2  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(482)

据我所知,每人将有一份工作 action 在Spark里。
但我经常看到,一个动作触发的工作不止一个。我试图通过在数据集上进行简单的聚合来测试这一点,以从每个类别(这里是“subject”字段)中获得最大值
在检查spark ui时,我可以看到有3个“作业”为 groupBy 手术,而我只期待一个。
有人能帮我理解为什么只有3而不是1吗?

students.show(5)

    +----------+--------------+----------+----+-------+-----+-----+
    |student_id|exam_center_id|   subject|year|quarter|score|grade|
    +----------+--------------+----------+----+-------+-----+-----+
    |         1|             1|      Math|2005|      1|   41|    D|
    |         1|             1|   Spanish|2005|      1|   51|    C|
    |         1|             1|    German|2005|      1|   39|    D|
    |         1|             1|   Physics|2005|      1|   35|    D|
    |         1|             1|   Biology|2005|      1|   53|    C|
    |         1|             1|Philosophy|2005|      1|   73|    B|

  // Task : Find Highest Score in each subject
  val highestScores = students.groupBy("subject").max("score")
  highestScores.show(10)

+----------+----------+
|   subject|max(score)|
+----------+----------+
|   Spanish|        98|
|Modern Art|        98|
|    French|        98|
|   Physics|        98|
| Geography|        98|
|   History|        98|
|   English|        98|
|  Classics|        98|
|      Math|        98|
|Philosophy|        98|
+----------+----------+
only showing top 10 rows

在检查spark ui时,我可以看到有3个“作业”为 groupBy 手术,而我只期待一个。


有人能帮我理解为什么只有3而不是1吗?

== Physical Plan ==

* (2) HashAggregate(keys=[subject#12], functions=[max(score#15)])

+- Exchange hashpartitioning(subject#12, 1)
   +- *(1) HashAggregate(keys=[subject#12], functions=[partial_max(score#15)])
      +- *(1) FileScan csv [subject#12,score#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/lab/SparkLab/files/exams/students.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<subject:string,score:int>
m2xkgtsf

m2xkgtsf1#

我建议你检查一下身体计划-

highestScores.explain()

你可能会看到-


* (2) HashAggregate(keys=[subject#9], functions=[max(score#12)], output=[subject#9, max(score)#51])

+- Exchange hashpartitioning(subject#9, 2)
   +- *(1) HashAggregate(keys=[subject#9], functions=[partial_max(score#12)], output=[subject#9, max#61])

[Map阶段]阶段#1是实现局部聚合(部分聚合),然后使用 hashpartitioning(subject) . 注意hashpartitioner使用 group by
[减少阶段]阶段#2是合并阶段#1的输出以获得最终结果 max(score) 这实际上是用来打印前10条记录的 show(10)

vh0rcniy

vh0rcniy2#

我认为只有#3完成了实际的“工作”(执行一个计划,如果您打开sql选项卡上的查询详细信息,您将看到该计划)。另外两个是准备步骤--

1正在查询要生成的namenode InMemoryFileIndex 读取csv,以及

2正在对要执行的数据集进行采样 .groupBy("subject").max("score") 内部需要 sortByKey (这里有更多的细节)。

相关问题