spark如何将分区和分区按标签一起洗牌

n3ipq98p  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(361)

我正在从hdfs读取一组10000个累积大小为10tb的Parquet文件,并使用以下代码以分区方式将其写回hdfs

spark.read.orc("HDFS_LOC").repartition(col("x")).write.partitionBy("x").orc("HDFS_LOC_1")

我正在使用

spark.sql.shuffle.partitions=8000

我看到spark已经将5000个不同的“x”分区写入hdfs(hdfs\u loc\u 1)。如何在整个过程中使用“8000”的洗牌分区。我看到只有15000个文件被写在“x”的所有分区上。这是否意味着spark试图在“x”的每个分区创建8000个文件,但在写入时发现没有足够的数据在每个分区写入8000个文件,结果写入的文件更少?你能帮我理解这个吗?

dtcbnfnu

dtcbnfnu1#

设置 spark.sql.shuffle.partitions=8000 将设置spark程序的默认洗牌分区号。如果您在设置此选项之后尝试执行联接或聚合,您将看到此数字生效(您可以使用 df.rdd.getNumPartitions() ). 请参阅此处了解更多信息。
不过,在您的情况下,您将此设置用于 repartition(col("x") 以及 partitionBy("x") . 因此,如果不首先使用联接或聚合转换,您的程序将不会受到此设置的影响。两者的区别 repartition 以及 partitionBy 就是说,首先将数据分区到内存中,创建 cardinality("x") 分区数,此时第二个分区将向hdfs写入大致相同的分区数。为什么大约?因为有更多的因素决定了输出文件的确切数量。请查看以下资源以更好地了解此主题:
df.repartition和dataframewriter partitionby之间的区别?
pyspark:有效地将partitionby写入与原始表相同数量的总分区
因此,使用按列重新分区时首先要考虑的是 repartition(*cols) 或者 partitionBy(*cols) ,是列(或列的组合)具有的唯一值(基数)的数目。
也就是说,如果要确保创建8000个分区(即输出文件),请使用 repartition(partitionsNum, col("x")) 在您的例子中,partitionsnum==8000,然后调用 write.orc("HDFS_LOC_1") . 否则,如果您想保持分区数接近x的基数,只需调用 partitionBy("x") 到你原来的df然后 write.orc("HDFS_LOC_1") 用于将数据存储到hdfs。这将创建 cardinality(x) 包含分区数据的文件夹。

相关问题