我正在从hdfs读取一组10000个累积大小为10tb的Parquet文件,并使用以下代码以分区方式将其写回hdfs
spark.read.orc("HDFS_LOC").repartition(col("x")).write.partitionBy("x").orc("HDFS_LOC_1")
我正在使用
spark.sql.shuffle.partitions=8000
我看到spark已经将5000个不同的“x”分区写入hdfs(hdfs\u loc\u 1)。如何在整个过程中使用“8000”的洗牌分区。我看到只有15000个文件被写在“x”的所有分区上。这是否意味着spark试图在“x”的每个分区创建8000个文件,但在写入时发现没有足够的数据在每个分区写入8000个文件,结果写入的文件更少?你能帮我理解这个吗?
1条答案
按热度按时间dtcbnfnu1#
设置
spark.sql.shuffle.partitions=8000
将设置spark程序的默认洗牌分区号。如果您在设置此选项之后尝试执行联接或聚合,您将看到此数字生效(您可以使用df.rdd.getNumPartitions()
). 请参阅此处了解更多信息。不过,在您的情况下,您将此设置用于
repartition(col("x")
以及partitionBy("x")
. 因此,如果不首先使用联接或聚合转换,您的程序将不会受到此设置的影响。两者的区别repartition
以及partitionBy
就是说,首先将数据分区到内存中,创建cardinality("x")
分区数,此时第二个分区将向hdfs写入大致相同的分区数。为什么大约?因为有更多的因素决定了输出文件的确切数量。请查看以下资源以更好地了解此主题:df.repartition和dataframewriter partitionby之间的区别?
pyspark:有效地将partitionby写入与原始表相同数量的总分区
因此,使用按列重新分区时首先要考虑的是
repartition(*cols)
或者partitionBy(*cols)
,是列(或列的组合)具有的唯一值(基数)的数目。也就是说,如果要确保创建8000个分区(即输出文件),请使用
repartition(partitionsNum, col("x"))
在您的例子中,partitionsnum==8000,然后调用write.orc("HDFS_LOC_1")
. 否则,如果您想保持分区数接近x的基数,只需调用partitionBy("x")
到你原来的df然后write.orc("HDFS_LOC_1")
用于将数据存储到hdfs。这将创建cardinality(x)
包含分区数据的文件夹。