pyspark Spark物理计划:ColumnarToRow运算符中输入批处理数的含义

dfty9e19  于 2023-01-01  发布在  Spark
关注(0)|答案(1)|浏览(468)

我正在查看一个通过运行Spark查询生成的物理计划。该查询读取一个parquet文件并进行一些聚合。在物理计划中,有一个名为ColumnarToRow的操作符,它有一个名为"number of input batchs"的统计信息。我很好奇这个输入批处理数是如何确定的?它似乎取决于parquet文件中行组的数量,但不完全取决于。
下面是我的代码:

  1. df1 = spark.read.parquet('data/')
  2. .select('col1')
  3. .groupby('col1')
  4. .agg(f.count('col1').alias('ct'))
  5. .toPandas()

下面是ColumnarToRow运算符统计信息:

  1. ColumnarToRow
  2. number of output rows: 327,069
  3. number of input batches: 80
bnl4lu3b

bnl4lu3b1#

这个ColumnarToRow块的存在是因为你正在阅读一个parquet文件,Parquet文件以一种面向列的方式存储,这带来了很多好处。
但是在Apache Spark中,RDD是以面向行的方式存储的,这使我们能够高效地执行mapreducegroupBy等经典操作。
现在,如果我们快速浏览一下生成您所谈论的这些数字的源代码(使用Spark v3.3.1),我们会在Columnar.scala中看到以下代码:

  1. override lazy val metrics: Map[String, SQLMetric] = Map(
  2. "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
  3. "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches")
  4. )
  5. override def doExecute(): RDD[InternalRow] = {
  6. val numOutputRows = longMetric("numOutputRows")
  7. val numInputBatches = longMetric("numInputBatches")
  8. // This avoids calling `output` in the RDD closure, so that we don't need to include the entire
  9. // plan (this) in the closure.
  10. val localOutput = this.output
  11. child.executeColumnar().mapPartitionsInternal { batches =>
  12. val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
  13. batches.flatMap { batch =>
  14. numInputBatches += 1
  15. numOutputRows += batch.numRows()
  16. batch.rowIterator().asScala.map(toUnsafe)
  17. }
  18. }
  19. }

我们可以看到numInputBatches瓦尔在mapPartitionsInternal函数中递增(numInputBatches += 1),这意味着numInputBatches表示您正在阅读的parquet文件的分区数!
你应该能够通过在一个(py)Spark壳中进行以下操作来验证这一点:

  1. spark.read.parquet('data/').rdd.getNumPartitions()

希望这有帮助!

展开查看全部

相关问题