通过pyspark Databricks从Azure BLOB容器中存储的增量表的分区中删除/覆盖数据的最佳方法是什么?

xxhby3vn  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(141)

我运行了一个Databricks notebook来通过Datafactory处理一些数据。然后数据以delta表的形式存储到Blob容器中。被转换的记录数量以亿为单位。因此,源数据被预先划分为分区,然后Datafactory从一个循环活动中调用数据转换Databricks notebook。
每个Notebook处理一堆分区。例如:如果数据被划分为12个分区。我们创建3个分区组,每个分区组包含4个分区。这样,Datafactory将在for循环中调用3个Databricks Notebook示例进行并行执行。
我正在实现逻辑来处理只有部分分区成功转换的情况。我们希望跳过组中成功完成的分区,只转换剩余的分区。
我发现下面的方法来实现它在以下方式:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")   
data.write.mode("overwrite").format("delta").partitionBy("partition_id").save(folder_name)

字符串
问题是我们有其他用例,我们使用“overwrite”模式完全替换数据,而不是仅仅替换特定数据。我正在考虑将partitionOverwriteMode值从“dynamic”转换为“static”。有没有什么方法可以让我只在执行代码行上方时使用“dynamic”设置,而不是在spark config上设置它?

hm2xizp9

hm2xizp91#

您可以在docs中阅读:
还可以通过将DataFrameWriter选项partitionOverwriteMode设置为dynamic来启用此功能
因此,要在查询级别应用它:

data.write
    .mode("overwrite").format("delta").partitionBy("partition_id")
    .option("partitionOverwriteMode", "dynamic")
    .save(folder_name)

字符串
顺便说一句,这种方法也适用于许多其他选择(尽管不是全部)。

相关问题