我正在查看一个通过运行Spark查询生成的物理计划。该查询读取一个parquet文件并进行一些聚合。在物理计划中,有一个名为ColumnarToRow
的操作符,它有一个名为"number of input batchs"的统计信息。我很好奇这个输入批处理数是如何确定的?它似乎取决于parquet文件中行组的数量,但不完全取决于。
下面是我的代码:
df1 = spark.read.parquet('data/')
.select('col1')
.groupby('col1')
.agg(f.count('col1').alias('ct'))
.toPandas()
下面是ColumnarToRow运算符统计信息:
ColumnarToRow
number of output rows: 327,069
number of input batches: 80
1条答案
按热度按时间bnl4lu3b1#
这个
ColumnarToRow
块的存在是因为你正在阅读一个parquet文件,Parquet文件以一种面向列的方式存储,这带来了很多好处。但是在Apache Spark中,RDD是以面向行的方式存储的,这使我们能够高效地执行
map
、reduce
、groupBy
等经典操作。现在,如果我们快速浏览一下生成您所谈论的这些数字的源代码(使用Spark v3.3.1),我们会在
Columnar.scala
中看到以下代码:我们可以看到
numInputBatches
瓦尔在mapPartitionsInternal
函数中递增(numInputBatches += 1
),这意味着numInputBatches
表示您正在阅读的parquet文件的分区数!你应该能够通过在一个(py)Spark壳中进行以下操作来验证这一点:
希望这有帮助!