apachespark:获取每个分区的记录数

pokxtpni  于 2021-05-29  发布在  Hadoop
关注(0)|答案(5)|浏览(587)

我想检查一下,当spark作业以yarn cluster的deploy模式提交以便在控制台上记录或打印时,如何获得每个分区的信息,比如驱动端每个分区的记录总数。

sirbozc5

sirbozc51#

对于未来的Pypark用户:

from pyspark.sql.functions  import spark_partition_id
rawDf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()
vdgimpew

vdgimpew2#

每个分区的记录数如下:

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show

但这也会自己启动spark作业(因为spark必须读取文件才能获得记录数)。
spark也可以读取配置单元表统计信息,但我不知道如何显示这些元数据。。

4sup72z8

4sup72z83#

spark/scala公司:

val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect()  # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length)  # check if skewed

Pypark公司:

num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect()  # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l))  # check if skewed

同样的情况也可能发生在 dataframe ,不仅仅是为了 RDD . 只需添加 DF.rdd.glom ... 输入上面的代码。
字幕:mike dusenberry@https://issues.apache.org/jira/browse/spark-17817

frebpwbc

frebpwbc4#

spark 1.5解决方案:

( sparkPartitionId() 存在于 org.apache.spark.sql.functions )

import org.apache.spark.sql.functions._ 

df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show

正如@raphael roth提到的 mapPartitionsWithIndex 是最好的方法,将适用于所有版本的spark,因为它基于rdd的方法

fgw7neuy

fgw7neuy5#

我会使用内置函数。它应该尽可能地高效:

import org.apache.spark.sql.functions.spark_partition_id

df.groupBy(spark_partition_id).count

相关问题