是否可以在pyspark中使用udf进行基于文件的处理?

holgip5t  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(501)

我定义了一个udf,它使用一个Dataframe执行以下操作,其中一列包含azure blob存储中zip文件的位置(我在没有spark的情况下测试了udf,结果成功了):
从blob下载已定义的文件,并将其保存在excutor/driver上的某个位置
提取blob的某个文件并将其保存在excutor/driver上
有了这个自定义项,我体验到的速度就像我在文件上循环python一样快。那么,有没有可能在spark中完成这种任务呢?我想使用spark来并行化下载和解压以加快速度。我通过ssh连接到excutor和驱动程序(它是一个测试集群,所以每个集群只有一个),发现excutor上只处理了数据,驱动程序什么都没做。为什么会这样?
下一步是将提取的文件(普通csv)读取到sparkDataframe。但是,如果文件分布在excutor和driver上,如何实现这一点呢?我还没有找到进入excustors仓库的方法。或者可以在udf中定义一个公共位置,将其写回驱动程序中的某个位置?
我想阅读比提取的文件有:

data_frame = (
  spark
    .read
    .format('csv')
    .option('header', True)
    .option('delimiter', ',')  
    .load(f"/mydriverpath/*.csv"))

如果有其他方法来并行下载和解压缩的文件,我会很高兴听到它。

blpfk2vs

blpfk2vs1#

pyspark读写器使并行读写文件变得容易。在spark中工作时,通常不应在驱动程序节点上循环文件或保存数据。
假设您有100个gzip csv文件在 my-bucket/my-folder 目录。下面是如何将它们并行读入Dataframe:

df = spark.read.csv("my-bucket/my-folder")

下面是如何将它们写入50个snappy压缩Parquet文件(并行):

df.repartition(50).write.parquet("my-bucket/another-folder")

读者/作家为你做所有的重担。更多信息请参见此处 repartition .

相关问题