如何配置Spark在join或groupby之后调整输出分区的数量?

sdnqo3pr  于 2023-05-23  发布在  Apache
关注(0)|答案(1)|浏览(336)

bounty还有6天到期。回答此问题可获得+300声望奖励。Rinat Veliakhmedov正在寻找典型答案

我知道你可以设置spark.sql.shuffle.partitionsspark.sql.adaptive.advisoryPartitionSizeInBytes。前者不适用于自适应查询执行,而后者由于某种原因只适用于第一次 Shuffle ,之后它只使用默认的分区数量,即#核心。
有没有办法配置AQE来调整分区的数量,使每个分区不超过100MB?

z8dt9xmd

z8dt9xmd1#

不确定您正在使用哪个版本的Spark,但您可以尝试将spark.sql.adaptive.coalescePartitions.minPartitionNum设置为某个值,开始时您可以尝试使用与sql.shuffle.partitions相同的值
我希望通过这个设置,你可以同时拥有这两种功能--小分区的自动合并+aqe对偏斜的处理,但是当有很多事情要做时,它会尝试从spark.sql.adaptive.coalescePartitions.minPartitionNum中保留最小数量的分区
目前,我还没有看到任何其他方法来强制spark动态计算它,以保持分区不大于100 mb。
为什么我认为它可能会改变一些事情:
以下是此参数的说明:

  1. val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
  2. buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
  3. .internal()
  4. .doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
  5. "after coalescing. If not set, the default value is the default parallelism of the " +
  6. "Spark cluster. This configuration only has an effect when " +
  7. s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
  8. s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
  9. .version("3.0.0")
  10. .intConf
  11. .checkValue(_ > 0, "The minimum number of partitions must be positive.")
  12. .createOptional

所以它是可选的,现在让我们检查它在哪里使用Spark代码:

  1. // Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
  2. // ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
  3. // rule by default tries to maximize the parallelism and set the target size to
  4. // `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
  5. // is too big, this rule also respect the minimum partition size specified by
  6. // COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
  7. // For history reason, this rule also need to support the config
  8. // COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
  9. val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
  10. if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
  11. // We fall back to Spark default parallelism if the minimum number of coalesced partitions
  12. // is not set, so to avoid perf regressions compared to no coalescing.
  13. session.sparkContext.defaultParallelism
  14. } else {
  15. // If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
  16. // the specified advisory partition size will be respected.
  17. 1
  18. }
  19. }

看起来,当没有设置此参数并且spark.sql.adaptive.coalescePartitions.parallelismFirst设置为true(默认为true)时,Spark将选择默认并行度作为minPartitionNum。可能这就是您看到分区数量等于核心数量的原因
如果我理解正确的话,如果你设置spark.sql.adaptive.coalescePartitions.minPartitionNum,它应该可以做到这一点,并允许你对分区有更多的控制。
如果它没有帮助,或者您期望其他东西,您可以尝试使用其他sql.adaptive参数并检查它们在源代码中的使用情况。
我认为this blog post可能是一个很好的起点

展开查看全部

相关问题