我想在Jupyter notebook的GCP上使用多个GPU(2个T4,每个GPU 15 GB)和16个vCPU(60 GB RAM)上的“句子转换器”(为文本数据生成的嵌入)处理一些文本数据。
数据大小并不大,但由于内存泄漏,即使从shell设置了垃圾收集阈值,也会重新启动工作节点。
我的代码:
# run export MALLOC_TRIM_THRESHOLD_=65536 from shell before starting dask cluster
!pip install sentence-transformers
import os
import glob
import numpy as np
import gc
import cudf
import dask_cudf
import cupy
import rmm
from dask.distributed import Client, wait, get_worker, get_client
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1", n_workers=2, threads_per_worker=4, memory_limit="15GB",\
device_memory_limit="24GB", rmm_pool_size="4GB", rmm_maximum_pool_size="15GB")
client = Client(cluster)
print(client.run(os.getenv, "MALLOC_TRIM_THRESHOLD_")) # 65536
initial_pool_size = 4*10**9
maximum_pool_size = 15*10**9
rmm.reinitialize(pool_allocator=True, managed_memory=True, initial_pool_size=initial_pool_size,
maximum_pool_size=maximum_pool_size, devices=[0,1], logging=True, log_file_name='./tmp/logs/test_sbert_distributed.log')
import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random
df = pd.DataFrame({'col_1': ["This is sentence " + str(x) for x in random.sample(range(10**7), 10**7)],
'col_2': ["That is another sentence " + str(x) for x in random.sample(range(10**7), 10**7)]})
cudf_df = cudf.DataFrame.from_pandas(df)
dask_df = dask_cudf.from_cudf(cudf_df, npartitions=8)
from sentence_transformers import SentenceTransformer
import numpy as np
sbert_model = SentenceTransformer('all-MiniLM-L6-v2')
def test_f_str(df, args):
col1, col2, chunks = args
for col in [col1, col2]:
emb = sbert_model.encode(sentences=df[col].to_arrow().to_pylist(), batch_size=1250, show_progress_bar=True)
semb = np.array([str(x) for x in emb])
emb_array = dask.array.from_array(semb, chunks=chunks)
df[col+'_emb'] = emb_array
return df
dask_cudf.core.Series
chunks = dask_df.map_partitions(lambda x: len(x)).compute().to_numpy()
print(chunks, type(chunks))
[1250000 1250000 1250000 1250000 1250000 1250000 1250000 1250000] <class 'numpy.ndarray'>
dask_df.npartitions, dask_df.persist()
(8, <dask_cudf.DataFrame | 8 tasks | 8 npartitions>)
new_dask_df = dask_df.map_partitions(test_f_str,
args=('col_1', 'col_2', chunks),\
meta={'col_1':'object',\
'col_2':'object',\
'col_1_emb':'object',\
'col_2_emb':'object'})
new_dask_df.dtypes
col_1 object
col_2 object
col_1_emb object
col_2_emb object
dtype: object
new_dask_df.compute() # error: WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory
# may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information.
# -- Unmanaged memory: 9.96 GiB -- Worker memory limit: 13.97 GiB
我试过很多方法,但对这个问题没有帮助。
https://www.coiled.io/blog/tackling-unmanaged-memory-with-dask
https://stackoverflow.com/questions/71203077/why-does-dask-distributed-auto-memory-trimming-not-work
https://github.com/dask/distributed/issues/5971
https://stackoverflow.com/questions/72180961/dask-memory-leak-workaround
https://stackoverflow.com/questions/58275476/dask-distributed-workers-always-leak-memory-when-running-many-tasks
https://distributed.dask.org/en/stable/worker-memory.html
有人能指出我错过了什么吗?
=============更新===================
我正在使用https://developer.nvidia.com/blog/gpu-dashboards-in-jupyter-lab/和https://developer.nvidia.com/blog/gpu-dashboards-in-jupyter-lab/的 Jmeter 板,但是“工作内存”(每个工作内存存储的字节数)图没有显示任何“未管理或泄漏”的“内存”。
但是,基于www.example.com,“GPU内存”图的颜色变为橙子并显示内存溢出https://distributed.dask.org/en/stable/worker-memory.html#using-the-dashboard-to-monitor-memory-usage
请问如何确认是CPU还是GPU内存泄漏?
1条答案
按热度按时间ldxq2e6h1#
您的错误提示系统RAM占用了大量内存,而不是GPU。虽然有很多代码(太多了,无法理解),但您没有展示
dask_df
是如何创建的。我不认为下面这一行有问题:
这发生在
map_partitions
的上下文中,因此这里的输入应该是pandas/cudf,输出应该是pandas/cudf或numpy/cupy。你不应该从一个函数中调用dask API,这个函数应该由dask作为一个任务运行。此外,整个dataframe是在客户端(使用python字符串对象和列表!)而不是在worker任务中按块执行,这是一种明确的反模式。这意味着在dask-cudf中调用IO函数(例如read_parquet或from_map)。你也可以调用
.persist()
,如果你有内存问题,这似乎是一个坏主意-使用这个方法,所有内存都是预先分配的,但是如果没有它,你将加载每个块,处理它,然后释放内存(假设你已经做了本段第一部分的建议)。最后,你
.compute()
整个事情,产生一堆对象的地方。这肯定不是处理的最后一点,为什么要计算?这是复制片段并将它们连接到一个 Dataframe 中:除非你已经做了大量的聚合,否则会非常浪费内存。