在apachespark的“bucketby”中,如何为每个bucket生成1个文件而不是每个分区为每个bucket生成1个文件?

fv2wmkja  于 2021-06-24  发布在  Hive
关注(0)|答案(2)|浏览(545)

我试图在一个相当大的数据集上使用spark的bucketby特性。

dataframe.write()
    .format("parquet")
    .bucketBy(500, bucketColumn1, bucketColumn2)
    .mode(SaveMode.Overwrite)
    .option("path", "s3://my-bucket")
    .saveAsTable("my_table");

问题是我的spark cluster有大约500个分区/任务/执行器(不确定术语),因此我最终得到的文件如下所示:

part-00001-{UUID}_00001.c000.snappy.parquet
part-00001-{UUID}_00002.c000.snappy.parquet
...
part-00001-{UUID}_00500.c000.snappy.parquet

part-00002-{UUID}_00001.c000.snappy.parquet
part-00002-{UUID}_00002.c000.snappy.parquet
...
part-00002-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_00001.c000.snappy.parquet
part-00500-{UUID}_00002.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet

那是500x500=250000的 parquet 锉刀!这需要永远的时间 FileOutputCommitter 把它交给s3。
有没有一种方法可以像在hive中那样,为每个bucket生成一个文件?还是有更好的方法来解决这个问题?现在看来,我必须在降低集群的并行性(减少写入程序的数量)和降低Parquet文件的并行性(减少桶的数量)之间做出选择。
谢谢

fjnneemd

fjnneemd1#

这应该能解决问题。

dataframe.write()
.format("parquet")
.bucketBy(1, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");

将bucketby函数的输入参数修改为1。你可以从spark的git存储库中查看bucketby的代码-https://github.com/apache/spark/blob/f8d59572b014e5254b0c574b26e101c2e4157bdd/sql/core/src/main/scala/org/apache/spark/sql/dataframewriter.scala
第一个拆分的part-00001和part-00002基于保存带扣表时运行的并行任务数。在你的例子中,有500个并行任务在运行。每个零件文件中的文件数取决于为bucketby函数提供的输入。
要了解有关spark任务、分区、执行器的更多信息,请查看我的媒体文章-https://medium.com/@tharun026

628mspwn

628mspwn2#

为了每个最终bucket获得1个文件,请执行以下操作。在将dataframe写入表之前,使用与bucketing使用的列完全相同的列重新分区,并将新分区的数量设置为等于bucketby中使用的bucket数量(或者一个较小的数字,它是bucket数量的除数,尽管我看不出有理由在这里使用较小的数字)。
在您的情况下,可能会这样:

dataframe.repartition(500, bucketColumn1, bucketColumn2)
    .write()
    .format("parquet")
    .bucketBy(500, bucketColumn1, bucketColumn2)
    .mode(SaveMode.Overwrite)
    .option("path", "s3://my-bucket")
    .saveAsTable("my_table");

在保存到现有表的情况下,需要确保列的类型完全匹配(例如,如果您的列x在dataframe中是int,但是表中的bigint将被插入到由x重新划分为500个bucket的表中,它与被x重新划分为bigint的表不匹配,最终500个执行者中的每个人都将再次编写500个文件)。
只是要100%清楚-这种重新分区将为您的执行添加另一个步骤,即为1个执行器上的每个bucket收集数据(因此,如果数据之前没有以相同的方式分区,则需要进行一次完整的数据重新排列)。我想这正是你想要的。
在另一个答案的评论中也提到,如果你的按键倾斜,你需要为可能出现的问题做好准备。这是真的,但是如果在加载表之后做的第一件事就是聚合/联接所绑定的相同列(对于选择按这些列进行绑定的人来说,这似乎是一种非常可能的情况),那么默认的spark行为并不能完全帮助您。相反,您将得到一个延迟问题,并且只有在写入之后尝试加载数据时才会看到偏差。
在我看来,如果spark提供一个设置,在编写一个bucked表之前(尤其是在插入到现有表中时)总是对数据进行重新分区,那就太好了。

相关问题