我想从pysparkDataframe生成一个avro文件,目前我正在做 coalesce
如下所示
df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')
但现在这导致了内存问题,因为所有数据都将在写入之前被提取到内存中,而且我的数据量每天都在不断增长。所以我想按每个分区写入数据,这样数据就可以成片写入磁盘,而不会引起oom问题。我发现了 toLocalIterator
有助于实现这一目标。但我不知道怎么用。我尝试了下面的用法,它返回所有行
iter = df.toLocalIterator()
for i in iter:
print('writing some data')
# write the data into disk/file
iter是迭代每一行而不是每个分区。我该怎么做?
1条答案
按热度按时间xfyts7mz1#
当你这么做的时候
df = df.coalesce(1)
所有数据都收集到一个工作节点中。如果由于节点上的资源限制,该节点无法处理如此庞大的数据,那么该作业将失败并出现oom错误。根据spark文档,tolocarettor返回一个迭代器,其中包含当前数据集中的所有行,它所能消耗的最大内存相当于这个数据集中最大的分区
作家是如何工作的?
第一个分区被发送到驱动程序。如果继续迭代并到达第一个分区的末尾,第二个分区将被发送到驱动程序节点,依此类推,直到最后一个分区。。所以这就是为什么(它能占用的最大内存=最大分区)要确保主节点有足够的ram和磁盘。
next()方法确保在上一个分区处理完成后,拉取下一个分区记录。
注意:确保缓存parentdf。。否则在某些情况下,每个分区都需要重新计算。