spark:当一个groupby被调用时,重新分区到一个节点的Dataframe是否会经历洗牌?

axr492tv  于 2021-05-27  发布在  Hadoop
关注(0)|答案(2)|浏览(463)

假设我有一些数据都在同一个分区上(我执行了 .coalesce(1) 在Dataframe上)。我现在要对数据进行分组并对其执行聚合。如果我用 .groupBy 在Dataframe上,组会被放置到不同的节点上吗?
如果这是真的,我想避免这种情况,因为我想在不太混乱的情况下对这些组执行这些计算。

ha5z0ras

ha5z0ras1#

视情况而定。默认情况下,分区数由 spark.sql.shuffle.partitions . 避免这种情况的一种方法是使用 repartition 使用显式分区表达式而不是 coalesce :

val df = sparkSession.createDataFrame(
  sparkContext.parallelize(Seq(Row(1, "a"), Row(1, "b"), Row(2, "c"))),
  StructType(List(StructField("foo", IntegerType, true), StructField("bar", StringType, true))))
df.repartition(numPartitions = 1, $"foo").groupBy("foo").agg(count("*")).explain()

一般来说,可以使用sparkwebui并在“stages”选项卡上监视shuffle读/写度量。

ee7vknir

ee7vknir2#

第一, coalesce(1) 不能保证所有数据都在一个节点中,这是您必须使用的资源 repartition(1) ,这将强制在单个节点中合并所有数据。 coalesce 只对同一节点中的分区进行分组,因此如果您的数据分布在5个节点中(每个节点中有多个分区),那么它将在最后保留5个分区。 repartition 强制洗牌,将所有数据移动到单个节点。
但是,如果您关心的是聚合中的分区数,那么这取决于聚合是否只是一个 reduce 在所有的数据中,sparksql会先尝试减少每个节点的数据,然后减少每个节点的结果,一个例子就是计数。但是对于bucketized聚合,就像用一个id计算元素的数量一样,spark要做的是首先减少每个节点中的元素,然后将数据洗牌到bucket中,以确保每个节点的所有减少,对于相同的id,都在同一个节点中,然后再次减少它们。bucket的数量是用属性配置的 spark.sql.shuffle.partitions ,并且每个任务都将作为作业中的一个任务执行。小心,因为 spark.sql.shuffle.partitions 这样做可能会使进程的其他部分(如连接或大聚合)变慢,或者导致内存不足错误。

相关问题