Apache Spark 什么是OpenCostInBytes?

6gpjuf90  于 2022-12-19  发布在  Apache
关注(0)|答案(1)|浏览(205)

有人能解释一下Apache Spark中的openCostInBytes吗?我可以在文档中看到定义,但我不明白它到底是如何影响文件阅读的。我真的应该关心这个吗?如果是,我应该如何调整它?

wn9m85ua

wn9m85ua1#

spark.files.openCostInBytes将影响输入数据将被读入多少个分区。精确的计算可以在Filepartition.scala中找到。
它在回答这个问题时存在的方式,计算如下:

def maxSplitBytes(
    sparkSession: SparkSession,
    selectedPartitions: Seq[PartitionDirectory]): Long = {
  val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
  val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
  val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
    .getOrElse(sparkSession.leafNodeDefaultParallelism)
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / minPartitionNum

  Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}

最后一行是有趣的一行。我们取最小值:

  • defaultMaxSplitBytes,它来自spark.sql.files.maxPartitionBytes,缺省值为128 * 1024 * 1024
  • 最大值:
  • openCostInBytes,它来自spark.sql.files.openCostInBytes,缺省值为4 * 1024
  • bytesPerCoretotalBytes / minPartitionNum。默认情况下,minPartitionNum来自spark.default.parallelism,这等于您的内核总数

因此,现在我们知道了这一点,我们可以尝试理解该计算的3种边缘情况(考虑参数的默认值):

  • 如果结果是defaultMaxSplitBytes的值,这是因为bytesPerCore大于其他值。只有在处理大文件时才会发生这种情况。大到如果我们在所有内核上公平分割数据,它将大于defaultMaxSplitBytes因此,我们在此限制每个分区的大小。
  • 如果结果是bytesPerCore的值,则意味着它小于128 MB但大于4 MB。在这种情况下,我们在所有内核上公平地拆分数据。
  • 如果结果是openCostInBytes的值,则意味着bytesPerCore非常小,小于4MB。由于每个分区都有打开成本,因此我们希望限制创建的分区数量。因此,在本例中,我们限制创建的分区数量

通过理解这一点,我们可以看到,只有当您的数据相对于集群很小时(即,如果您的data size / nr of cores in cluster很小),此值才会起作用。
希望这有帮助!

相关问题