假设我有一些数据都在同一个分区上(我执行了 .coalesce(1) 在Dataframe上)。我现在要对数据进行分组并对其执行聚合。如果我用 .groupBy 在Dataframe上,组会被放置到不同的节点上吗?如果这是真的,我想避免这种情况,因为我想在不太混乱的情况下对这些组执行这些计算。
.coalesce(1)
.groupBy
ha5z0ras1#
视情况而定。默认情况下,分区数由 spark.sql.shuffle.partitions . 避免这种情况的一种方法是使用 repartition 使用显式分区表达式而不是 coalesce :
spark.sql.shuffle.partitions
repartition
coalesce
val df = sparkSession.createDataFrame( sparkContext.parallelize(Seq(Row(1, "a"), Row(1, "b"), Row(2, "c"))), StructType(List(StructField("foo", IntegerType, true), StructField("bar", StringType, true)))) df.repartition(numPartitions = 1, $"foo").groupBy("foo").agg(count("*")).explain()
一般来说,可以使用sparkwebui并在“stages”选项卡上监视shuffle读/写度量。
ee7vknir2#
第一, coalesce(1) 不能保证所有数据都在一个节点中,这是您必须使用的资源 repartition(1) ,这将强制在单个节点中合并所有数据。 coalesce 只对同一节点中的分区进行分组,因此如果您的数据分布在5个节点中(每个节点中有多个分区),那么它将在最后保留5个分区。 repartition 强制洗牌,将所有数据移动到单个节点。但是,如果您关心的是聚合中的分区数,那么这取决于聚合是否只是一个 reduce 在所有的数据中,sparksql会先尝试减少每个节点的数据,然后减少每个节点的结果,一个例子就是计数。但是对于bucketized聚合,就像用一个id计算元素的数量一样,spark要做的是首先减少每个节点中的元素,然后将数据洗牌到bucket中,以确保每个节点的所有减少,对于相同的id,都在同一个节点中,然后再次减少它们。bucket的数量是用属性配置的 spark.sql.shuffle.partitions ,并且每个任务都将作为作业中的一个任务执行。小心,因为 spark.sql.shuffle.partitions 这样做可能会使进程的其他部分(如连接或大聚合)变慢,或者导致内存不足错误。
coalesce(1)
repartition(1)
reduce
2条答案
按热度按时间ha5z0ras1#
视情况而定。默认情况下,分区数由
spark.sql.shuffle.partitions
. 避免这种情况的一种方法是使用repartition
使用显式分区表达式而不是coalesce
:一般来说,可以使用sparkwebui并在“stages”选项卡上监视shuffle读/写度量。
ee7vknir2#
第一,
coalesce(1)
不能保证所有数据都在一个节点中,这是您必须使用的资源repartition(1)
,这将强制在单个节点中合并所有数据。coalesce
只对同一节点中的分区进行分组,因此如果您的数据分布在5个节点中(每个节点中有多个分区),那么它将在最后保留5个分区。repartition
强制洗牌,将所有数据移动到单个节点。但是,如果您关心的是聚合中的分区数,那么这取决于聚合是否只是一个
reduce
在所有的数据中,sparksql会先尝试减少每个节点的数据,然后减少每个节点的结果,一个例子就是计数。但是对于bucketized聚合,就像用一个id计算元素的数量一样,spark要做的是首先减少每个节点中的元素,然后将数据洗牌到bucket中,以确保每个节点的所有减少,对于相同的id,都在同一个节点中,然后再次减少它们。bucket的数量是用属性配置的spark.sql.shuffle.partitions
,并且每个任务都将作为作业中的一个任务执行。小心,因为spark.sql.shuffle.partitions
这样做可能会使进程的其他部分(如连接或大聚合)变慢,或者导致内存不足错误。