我有一个dask群集,其中有多个工作进程,每个工作进程都有93 GiB = 100 GB内存,整个群集的内存超过2 TiB(请参见下图)。当我在作业运行时查看 Jmeter 板时,它会有一些波动,但看起来总是像图中所示的东西,即没有接近内存限制的地方。然后,其中一个工作线程将由于内存不足而死亡。我真正困惑的是这是如何发生的,为什么它根本没有显示在 Jmeter 板中?(注意我的dask版本足够新,它将每个工作线程的非托管内存显示为浅色)。
我的任务是加载在2D网格(波场)上定义的相对较大的数据集。首先,我希望在时域中对其进行过滤(这意味着一次访问每个点的整个时间轴)。然后,我希望将每个单个时间的所有点的过滤数据写入单独的文件。当这两个任务是独立的时(即,如果我只过滤数据而不写入;或者如果我不过滤数据而只写原始波场),dask工作得很好。然而,当它们结合在一起时,OOM错误会在大型模拟中出现(但在小型模拟中仍然工作得很好)。
原始波场数据(变量:wave_on_slice_channel
)的大小为11.67 GiB。
对于较小的测试模拟(当上述两个任务结合时工作),它仅为20. 75 MiB。
我的(简化)代码如下:
### Function to filter
def filter_wavefield(pos, butter_filter):
filtered = signal.sosfilt(butter_filter,wave_on_slice_channel[pos,:].compute()).astype("float32")
return filtered
### Function to write files
def save_filtered_wavefield(chunk):
# Many lines omitted here for setting up the write
filtered_data = ncfile.createVariable('filtered_data', np.float32, ('data','time'))
filtered_data[:,:] = blocks[chunk].compute()
ncfile.close()
return
### Putting multiple points together into a dask bag to avoid crushing the scheduler
coord_list = [i for i in range(nelem*ngll)]
coord_bag = db.from_sequence(coord_list,npartitions=100)
coord_bag = coord_bag.persist()
wait(coord_bag)
### Submitting tasks for filtering
### and converting back to dask arrays
filtered = coord_bag.map(filter_wavefield, butter_filter)
filtered_waves = filtered.compute() # this is a numpy array
filtered_da = da.from_array(filtered_waves,chunks=wave_on_slice_channel.chunks) # this should be exactly the same in size and shape as the raw wavefield, except this is filtered
blocks = filtered_da.to_delayed().ravel() # Split filtered wavefield by the raw wavefield's original chunks so each writer only sees a portion of the whole wavefield.
无论是小型还是大型模拟,上述代码都能正常工作(因为这只是两个任务之一,即滤波+写入)。作为检查,示出了用于小型模拟的filtered_da
,并且我们可以看到它与来自小型模拟的原始波场完全相同(除了图形层的数量,我认为这只是得到这个dask数组所需的操作数量,所以并不重要?)
当我想把这些过滤后的数据保存到文件中时,**问题出现了。我有类似于上面的东西:
### Use dask bags to avoid too many tasks
file_list = [i for i in range(len(blocks))]
file_bag = db.from_sequence(file_list,npartitions=len(blocks))
file_bag = file_bag.persist()
wait(file_bag)
### Write out expected number of files to receive
### This file is always written so up to here everything is fine.
with open(dest_dir+'/NOF.txt','w') as f:
f.write("The number of expected filtered data files is: %d" % len(blocks)+'\n')
### Submit tasks to write files
### This is where things break
for i in range(len(blocks)):
f.append(client.submit(save_filtered_wavefield,i)).
请注意,传递给每个save_filtered_wavefield
调用的变量只是一个索引i
,然后在该函数中使用blocks[i].compute()
访问数据。我认为这很好,因为过滤也有wave_on_slice_channel[pos,:].compute()
。
我试过从内存中删除一些变量,特别是持久化的coord_bag
,但问题仍然存在。我也试过阅读一些关于在dask上管理内存的文章,但由于我似乎看不到我的 Jmeter 板上的任何东西,我仍然很困惑。
很抱歉长的职位,但任何帮助将不胜感激!
1条答案
按热度按时间uidvcgyl1#
希望我们能提供更详细的答复。
我首先想到的是:在要提交的函数中访问全局dask数组是一个坏主意。你应该只在客户端调用高级API(如array,bag),并编写只在分区上工作的函数(numpy数组)。你通常不应该在worker上调用
compute()
。