有人能解释一下Apache Spark中的openCostInBytes吗?我可以在文档中看到定义,但我不明白它到底是如何影响文件阅读的。我真的应该关心这个吗?如果是,我应该如何调整它?
wn9m85ua1#
spark.files.openCostInBytes将影响输入数据将被读入多少个分区。精确的计算可以在Filepartition.scala中找到。它在回答这个问题时存在的方式,计算如下:
spark.files.openCostInBytes
def maxSplitBytes( sparkSession: SparkSession, selectedPartitions: Seq[PartitionDirectory]): Long = { val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum .getOrElse(sparkSession.leafNodeDefaultParallelism) val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / minPartitionNum Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) }
最后一行是有趣的一行。我们取最小值:
defaultMaxSplitBytes
spark.sql.files.maxPartitionBytes
openCostInBytes
spark.sql.files.openCostInBytes
bytesPerCore
totalBytes / minPartitionNum
minPartitionNum
spark.default.parallelism
因此,现在我们知道了这一点,我们可以尝试理解该计算的3种边缘情况(考虑参数的默认值):
通过理解这一点,我们可以看到,只有当您的数据相对于集群很小时(即,如果您的data size / nr of cores in cluster很小),此值才会起作用。希望这有帮助!
data size / nr of cores in cluster
1条答案
按热度按时间wn9m85ua1#
spark.files.openCostInBytes
将影响输入数据将被读入多少个分区。精确的计算可以在Filepartition.scala中找到。它在回答这个问题时存在的方式,计算如下:
最后一行是有趣的一行。我们取最小值:
defaultMaxSplitBytes
,它来自spark.sql.files.maxPartitionBytes
,缺省值为128 * 1024 * 1024openCostInBytes
,它来自spark.sql.files.openCostInBytes
,缺省值为4 * 1024bytesPerCore
即totalBytes / minPartitionNum
。默认情况下,minPartitionNum
来自spark.default.parallelism
,这等于您的内核总数因此,现在我们知道了这一点,我们可以尝试理解该计算的3种边缘情况(考虑参数的默认值):
defaultMaxSplitBytes
的值,这是因为bytesPerCore
大于其他值。只有在处理大文件时才会发生这种情况。大到如果我们在所有内核上公平分割数据,它将大于defaultMaxSplitBytes
。因此,我们在此限制每个分区的大小。bytesPerCore
的值,则意味着它小于128 MB但大于4 MB。在这种情况下,我们在所有内核上公平地拆分数据。openCostInBytes
的值,则意味着bytesPerCore
非常小,小于4MB。由于每个分区都有打开成本,因此我们希望限制创建的分区数量。因此,在本例中,我们限制创建的分区数量通过理解这一点,我们可以看到,只有当您的数据相对于集群很小时(即,如果您的
data size / nr of cores in cluster
很小),此值才会起作用。希望这有帮助!