如何在本地系统中在单个文件中写入sparkDataframe而不使用coalesce

at0kjp5o  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(357)

我想从pysparkDataframe生成一个avro文件,目前我正在做 coalesce 如下所示

df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')

但现在这导致了内存问题,因为所有数据都将在写入之前被提取到内存中,而且我的数据量每天都在不断增长。所以我想按每个分区写入数据,这样数据就可以成片写入磁盘,而不会引起oom问题。我发现了 toLocalIterator 有助于实现这一目标。但我不知道怎么用。我尝试了下面的用法,它返回所有行

iter = df.toLocalIterator()
for i in iter:
    print('writing some data')
    # write the data into disk/file

iter是迭代每一行而不是每个分区。我该怎么做?

xfyts7mz

xfyts7mz1#

当你这么做的时候 df = df.coalesce(1) 所有数据都收集到一个工作节点中。如果由于节点上的资源限制,该节点无法处理如此庞大的数据,那么该作业将失败并出现oom错误。
根据spark文档,tolocarettor返回一个迭代器,其中包含当前数据集中的所有行,它所能消耗的最大内存相当于这个数据集中最大的分区
作家是如何工作的?
第一个分区被发送到驱动程序。如果继续迭代并到达第一个分区的末尾,第二个分区将被发送到驱动程序节点,依此类推,直到最后一个分区。。所以这就是为什么(它能占用的最大内存=最大分区)要确保主节点有足够的ram和磁盘。
next()方法确保在上一个分区处理完成后,拉取下一个分区记录。

what you can do is 

    //batch objects like 1000 per batch
    df.toLocalIterator().foreach(obj => {
      //add object in array
      //if batch size is reached ... 
        //then serialize them and use FileOutputStream and save in local location 
    })

注意:确保缓存parentdf。。否则在某些情况下,每个分区都需要重新计算。

相关问题