pyspark 使用Spark删除Azure blob中的文件的更快方法是什么?

1mrurvl1  于 2023-10-15  发布在  Spark
关注(0)|答案(4)|浏览(103)

我正在Azure上使用Databricks/Spark。我想删除超过100,000个存储在blob中的文本文件。blob已安装。我使用Python(PySpark)以并行方式删除这些文件的代码如下所示。

import os

# use dbutils to list all files
# very slow operation
text_files = ['/dbfs/mnt/myblob/myfolder/{}'.format(fi.name) for fi in dbutils.fs.ls('/dbfs/mnt/myblob/myfolder')]

# use spark context to parallelize list of text files
rdd = sc.parallelize(text_files)

# now delete files
# seems to be very slow as well when spark job kicks off
rdd.map(lambda p: os.remove(p)).count()

请注意,我使用dbutils列出挂载目录中的文件。这个操作很慢。还要注意,我使用Python的os.remove(...)来删除文件,这个操作似乎也很慢。
在Spark/Databricks的上下文中,有没有更快地删除Azure blob中文件的方法?
还有相关的帖子herehere。然而,前者没有指定上下文/环境(引用的链接是针对Scala的,这些方法在PySpark中的Spark上下文中不存在),后者希望实现并行删除,但没有说明解决方案是否需要使用Spark。

mqxuamgl

mqxuamgl1#

目前,您正在从分布式文件系统中阅读数据,然后将它们导入到Python驱动程序中,在Spark上导出它们,然后使用Python再次读取它们以执行删除操作。
我认为你可以删除spark步骤,在python中操作你的列表,直接使用dbutils删除文件。

k2arahey

k2arahey2#

我意识到没有直接回答你的问题,但是看到你正在使用Azure并且可能使用数据工厂(或者将来在编排/生产数据砖工作时可能会使用)。
您可能希望考虑数据工厂删除活动,这是数据工厂v2的一个合理的新添加:https://learn.microsoft.com/en-us/azure/data-factory/delete-activity
它具有并行删除的能力。

xcitsw88

xcitsw883#

我在Azure Blob Gen2存储上使用了以下方法(scala):
subMask包含partitionBy在parquet文件持久化期间创建的结构。

  • mssparkutils.fs.rm(x,true)* 递归地删除目录。
val rootPath = "abfss://**@**.dfs.core.windows.net/parquet/poc-out/"
    val subMask = s"/year=$year/month=$month/day=$dayOfMonth/"    

    mssparkutils.fs.ls(rootPath).filter(_.isDir).foreach( r => 
        {
            val sd = s"${r.path}$subMask"
            val pathExists = mssparkutils.fs.exists(sd)
            if(pathExists){
                mssparkutils.fs.rm(sd, true) //remove path + files
            }
        }
    )
jdgnovmf

jdgnovmf4#

file_location="give the file location"
objectName="abc"
listOfFile = dbutils.fs.ls(file_location)
fileNameSamePattern=[]

for file in listOfFile:
  if file.name.startswith(objectName):
    fileNameSamePattern.append(file.name)
print("File List of same pattern",fileNameSamePattern)

#deleting files from blob

for file in fileNameSamePattern:
    source_path = file_location + file
    dbutils.fs.rm(source_path)

相关问题