python 在多个GPU上运行作业时dask上的内存泄漏错误

zbdgwd5y  于 2023-06-28  发布在  Python
关注(0)|答案(1)|浏览(155)

我想在Jupyter notebook的GCP上使用多个GPU(2个T4,每个GPU 15 GB)和16个vCPU(60 GB RAM)上的“句子转换器”(为文本数据生成的嵌入)处理一些文本数据。
数据大小并不大,但由于内存泄漏,即使从shell设置了垃圾收集阈值,也会重新启动工作节点。
我的代码:

# run export MALLOC_TRIM_THRESHOLD_=65536 from shell before starting dask cluster

!pip install sentence-transformers
import os
import glob
import numpy as np
import gc

import cudf
import dask_cudf
import cupy
import rmm

from dask.distributed import Client, wait, get_worker, get_client
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1", n_workers=2, threads_per_worker=4, memory_limit="15GB",\
                           device_memory_limit="24GB", rmm_pool_size="4GB", rmm_maximum_pool_size="15GB") 

client = Client(cluster)

print(client.run(os.getenv, "MALLOC_TRIM_THRESHOLD_")) # 65536

initial_pool_size = 4*10**9
maximum_pool_size = 15*10**9
rmm.reinitialize(pool_allocator=True, managed_memory=True, initial_pool_size=initial_pool_size, 
                 maximum_pool_size=maximum_pool_size, devices=[0,1], logging=True, log_file_name='./tmp/logs/test_sbert_distributed.log')


import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random

df = pd.DataFrame({'col_1': ["This is sentence " + str(x) for x in random.sample(range(10**7), 10**7)], 
                             'col_2': ["That is another sentence " + str(x) for x in random.sample(range(10**7), 10**7)]})

cudf_df = cudf.DataFrame.from_pandas(df)
dask_df = dask_cudf.from_cudf(cudf_df, npartitions=8)

from sentence_transformers import SentenceTransformer
import numpy as np

sbert_model = SentenceTransformer('all-MiniLM-L6-v2')

def test_f_str(df, args):
    col1, col2, chunks = args

    for col in [col1, col2]:
        emb = sbert_model.encode(sentences=df[col].to_arrow().to_pylist(), batch_size=1250, show_progress_bar=True)
        semb = np.array([str(x) for x in emb])
        emb_array = dask.array.from_array(semb, chunks=chunks)
        df[col+'_emb'] = emb_array
    return df

dask_cudf.core.Series
    

chunks = dask_df.map_partitions(lambda x: len(x)).compute().to_numpy()
print(chunks, type(chunks))

[1250000 1250000 1250000 1250000 1250000 1250000 1250000 1250000] <class 'numpy.ndarray'>

dask_df.npartitions, dask_df.persist()
(8, <dask_cudf.DataFrame | 8 tasks | 8 npartitions>)

new_dask_df = dask_df.map_partitions(test_f_str, 
                                 args=('col_1', 'col_2', chunks),\
                                 meta={'col_1':'object',\
                                       'col_2':'object',\
                                       'col_1_emb':'object',\
                                       'col_2_emb':'object'}) 

new_dask_df.dtypes

col_1        object
col_2        object
col_1_emb    object
col_2_emb    object
dtype: object

new_dask_df.compute() # error: WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory 
#  may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. 
#  -- Unmanaged memory: 9.96 GiB -- Worker memory limit: 13.97 GiB

我试过很多方法,但对这个问题没有帮助。

https://www.coiled.io/blog/tackling-unmanaged-memory-with-dask
    https://stackoverflow.com/questions/71203077/why-does-dask-distributed-auto-memory-trimming-not-work
    https://github.com/dask/distributed/issues/5971
    https://stackoverflow.com/questions/72180961/dask-memory-leak-workaround
    https://stackoverflow.com/questions/58275476/dask-distributed-workers-always-leak-memory-when-running-many-tasks
    https://distributed.dask.org/en/stable/worker-memory.html

有人能指出我错过了什么吗?
=============更新===================
我正在使用https://developer.nvidia.com/blog/gpu-dashboards-in-jupyter-lab/https://developer.nvidia.com/blog/gpu-dashboards-in-jupyter-lab/的 Jmeter 板,但是“工作内存”(每个工作内存存储的字节数)图没有显示任何“未管理或泄漏”的“内存”。
但是,基于www.example.com,“GPU内存”图的颜色变为橙子并显示内存溢出https://distributed.dask.org/en/stable/worker-memory.html#using-the-dashboard-to-monitor-memory-usage
请问如何确认是CPU还是GPU内存泄漏?

ldxq2e6h

ldxq2e6h1#

您的错误提示系统RAM占用了大量内存,而不是GPU。虽然有很多代码(太多了,无法理解),但您没有展示dask_df是如何创建的。
我不认为下面这一行有问题:

emb_array = dask.array.from_array(semb, chunks=chunks)

这发生在map_partitions的上下文中,因此这里的输入应该是pandas/cudf,输出应该是pandas/cudf或numpy/cupy。你不应该从一个函数中调用dask API,这个函数应该由dask作为一个任务运行。
此外,整个dataframe是在客户端(使用python字符串对象和列表!)而不是在worker任务中按块执行,这是一种明确的反模式。这意味着在dask-cudf中调用IO函数(例如read_parquet或from_map)。你也可以调用.persist(),如果你有内存问题,这似乎是一个坏主意-使用这个方法,所有内存都是预先分配的,但是如果没有它,你将加载每个块,处理它,然后释放内存(假设你已经做了本段第一部分的建议)。
最后,你.compute()整个事情,产生一堆对象的地方。这肯定不是处理的最后一点,为什么要计算?这是复制片段并将它们连接到一个 Dataframe 中:除非你已经做了大量的聚合,否则会非常浪费内存。

相关问题