什么是最好的方式来清理失败的spark作业的输出?

ioekq8ef  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(414)

我正在运行多个spark作业,从各种s3 bucket读取数据,转换为parquet格式,然后写入单个s3 bucket,作为单个数据源从athena查询。
一个作业可能会因为各种原因(oom、s3节流等)而失败,当它失败时,常常会留下成百上千的错误 part-*.snappy.parquet 目标s3存储桶中的文件。我的输出数据也是这样分区的: /year=2020/month=8/day=8/hour=0/ . 所有作业运行都会写入同一文件夹,并且一个作业可能会在多个分区中产生结果。因为这个,我想我不能用 overwrite 模式,因为它将覆盖以前成功运行的作业的结果。在清理部分完成的作业运行时,是否有最佳做法?
我考虑过的一些选择:
每次作业运行都会在所有文件名中使用guid写入文件。这可用于手动查找失败作业运行中的文件并将其删除。这样做的问题是guid没有暴露给spark作业(据我所知),因此查找失败作业的guid(当然没有手动检查)似乎是一个挑战。
将文件写入不是最终目标的位置,并且只有在作业成功完成后,才能将这些文件移动到其最终位置。
选项2看起来很合理,但是需要额外的组件到这个etl管道中。
作为一个新的spark/aws胶水用户,在我尝试重新发明轮子之前找出最佳实践是很好的。什么样的模式是清理失败的Spark工作,他们的优点和缺点是什么?

11dmarpk

11dmarpk1#

因此,每当我们使用spark写入数据时,我们只需要传递文件夹路径,而无法控制文件名。现在考虑到这一点,您可以在写入目标目录时始终覆盖输出。

df.write.mode('overwrite').parquet(path)

这样你的目标就会永远被我们取代。

相关问题