pyspark Spark启动所有工作线程的作业,但在执行count()时在单个工作线程中仅使用1个执行器

dxxyhpgq  于 2022-12-03  发布在  Spark
关注(0)|答案(1)|浏览(158)

我有一个DF,它是分区的,相对较小。我尝试做一个简单的count()。在开始时,所有工作者和执行者都参与任务,但在作业的某个点上,只有1个工作者和1个核心在工作。即使数据以平衡的方式分布在工作者之间。
我已经尝试合并到1,也重新分区到2* 核心数量,仍然没有效果-无论我在这个DF上做什么样的操作,它总是从所有工作者开始,并保持只在一个1上工作。
如果有人知道哪里出了问题我会很感激。
DF信息:

Total Count: 
13065

Partitions: 
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   9| 5557|
|                  10|   62|
|                  11|  167|
|                   0|  128|
|                   1|   83|
|                   2|  110|
|                   3|  129|
|                   4|  131|
|                   5|   78|
|                   6| 6429|
|                   7|   39|
|                   8|  152|
+--------------------+-----+

应用程序主文件的屏幕截图:
DAG:

活动时间表:

任务:

uqjltbpv

uqjltbpv1#

任务始终在单个执行者上执行:因为它是Spark执行器所做的最基本的工作。
通过查看包含任务表的图像,我们可以看到12个任务(这些是您阶段中的所有任务还是还有更多任务?):其中大多数耗时〈1 s,但有一个耗时4.6min。
有趣的观察确实!这是有道理的,如果你有一个任务,需要的时间比所有其他任务长得多,你可以结束与一个单一的执行者不得不单独计算在最后。
因此,实际上您的问题更多的是数据不对称问题:为什么这一项任务比其他任务花费的时间要长得多?要回答这个问题,我们没有足够的信息来回答你的问题。我将从以下几个方面开始:

  • 正如您在stage的屏幕截图中所看到的,您的stage从shuffledRowRDD读取。这意味着它被某个东西分区。它被什么分区?它读取了多少个shuffled分区?您的spark.sql.shuffle.partitions配置的值是多少?
  • 在任务表中,可以看到GC时间为1秒(比其他任务大得多)。您是否在计算大对象?对象大小的差异是否可能真的很大?

希望这对你弄清楚这一点有帮助!

相关问题