当我使用Spark写入S3时,我正在尝试将我的数据分割为1 GB。我尝试的方法是以GB为单位计算DeltaTable的大小(定义_coalesce函数),四舍五入,并使用该数字在S3中写入:
# Vaccum to leave 1 week of history
deltaTable = DeltaTable.forPath(spark, f"s3a://{delta_table}")
deltaTable.vacuum(168)
deltaTable.generate("symlink_format_manifest")
# Reading delta table and rewriting with coalesce to reach 1GB per file
df = spark.read.format('delta').load(f"s3a://{delta_table}")
coalesce_number = define_coalesce(delta_table) < this function calculates the size of the delta in GB
df.coalesce(coalesce_number).write.format("delta").mode('overwrite').option('overwriteSchema', 'true').save(f"s3a://{delta_table}")
deltaTable = DeltaTable.forPath(spark, f"s3a://{delta_table}")
deltaTable.generate("symlink_format_manifest")
我之所以尝试这种方式,是因为我们的Delta是开源的,我们没有内置的优化方法。
我做了一些搜索,找到了Spark中的spk.sql.files.MaxPartitionBytes配置,但有人说它解决不了他们的问题,而且这个配置在读和写时都会分区。
有什么建议吗?
1条答案
按热度按时间3ks5zfa01#
我理解您的问题,以及您想要做的事情,但我不确定您当前解决方案的结果是什么。如果分区仍然不等于1 GB,您可以尝试用重新分区替换COALESSE。Coalesce不保证此操作后分区相等,因此您的公式可能不起作用。如果您知道输出上需要多少个分区,请使用reartition(COALESSE_NUMBER),它应该使用轮询创建相等的分区
如果问题出在计算数据集大小(即分区数量)的函数上,我知道两种解决方案:
1.您可以缓存数据集,然后从统计中获取其大小。当然,这可能会有问题,因此您必须花费一些资源。在第一个答案中执行了类似的操作:How spark get the size of a dataframe for broadcast?
1.您可以计算计数并将其除以您希望在单个分区中拥有的记录数量。单个记录的大小取决于您的方案,估计它可能很困难,但尝试一下是可行的选择