我定义了一个udf,它使用一个Dataframe执行以下操作,其中一列包含azure blob存储中zip文件的位置(我在没有spark的情况下测试了udf,结果成功了):
从blob下载已定义的文件,并将其保存在excutor/driver上的某个位置
提取blob的某个文件并将其保存在excutor/driver上
有了这个自定义项,我体验到的速度就像我在文件上循环python一样快。那么,有没有可能在spark中完成这种任务呢?我想使用spark来并行化下载和解压以加快速度。我通过ssh连接到excutor和驱动程序(它是一个测试集群,所以每个集群只有一个),发现excutor上只处理了数据,驱动程序什么都没做。为什么会这样?
下一步是将提取的文件(普通csv)读取到sparkDataframe。但是,如果文件分布在excutor和driver上,如何实现这一点呢?我还没有找到进入excustors仓库的方法。或者可以在udf中定义一个公共位置,将其写回驱动程序中的某个位置?
我想阅读比提取的文件有:
data_frame = (
spark
.read
.format('csv')
.option('header', True)
.option('delimiter', ',')
.load(f"/mydriverpath/*.csv"))
如果有其他方法来并行下载和解压缩的文件,我会很高兴听到它。
1条答案
按热度按时间blpfk2vs1#
pyspark读写器使并行读写文件变得容易。在spark中工作时,通常不应在驱动程序节点上循环文件或保存数据。
假设您有100个gzip csv文件在
my-bucket/my-folder
目录。下面是如何将它们并行读入Dataframe:下面是如何将它们写入50个snappy压缩Parquet文件(并行):
读者/作家为你做所有的重担。更多信息请参见此处
repartition
.