我有以下代码:
import sentence_transformers
import multiprocessing
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np
embedding_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
data = [[100227, 7382501.0, 'view', 30065006, False, ''],
[100227, 7382501.0, 'view', 57072062, True, ''],
[100227, 7382501.0, 'view', 66405922, True, ''],
[100227, 7382501.0, 'view', 5221475, False, ''],
[100227, 7382501.0, 'view', 63283995, True, '']]
df_text = dict()
df_text[7382501] = {'title': 'The Geography of the Internet Industry, Venture Capital, Dot-coms, and Local Knowledge - MATTHEW A. ZOOK', 'abstract': '23', 'highlight': '12'}
df_text[30065006] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[57072062] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[66405922] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[5221475] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[63283995] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
# Define the function to be executed in parallel
def process_data(chunk):
results = []
for row in chunk:
print(row[0])
work_id = row[1]
mentioning_work_id = row[3]
print(work_id)
if work_id in df_text and mentioning_work_id in df_text:
title1 = df_text[work_id]['title']
title2 = df_text[mentioning_work_id]['title']
embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)
similarity = np.matmul(embeddings_title1, embeddings_title2.T)
results.append([row[0],row[1],row[2],row[3],row[4],similarity])
else:
continue
return results
# Define the number of CPU cores to use
num_cores = multiprocessing.cpu_count()
# Split the data into chunks
chunk_size = len(data) // num_cores
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# Create a pool of worker processest
pool = multiprocessing.Pool(processes=num_cores)
results = []
with tqdm(total=len(data)) as pbar:
for i, result_chunk in enumerate(pool.map(process_data, chunks)):
# Update the progress bar
pbar.update()
# Add the results to the list
results += result_chunk
# Concatenate the results
final_result = results
我在Amazon Sagemaker
上运行这段代码,它在一个有2个CPU的示例上运行得很好。它给我进度条和一切。但是我想在一个有更多CPU的更大的示例上运行它。但是它只是挂在更多的CPU上,根本没有进步。当我最终停止内核时,我得到这个错误:
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-18-19449c86abd3> in <module>
1 results = []
2 with tqdm(total=len(chunks)) as pbar:
----> 3 for i, result_chunk in enumerate(pool.map(process_data, chunks)):
4 # Update the progress bar
5 pbar.update()
/opt/conda/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
649
650 def get(self, timeout=None):
--> 651 self.wait(timeout)
652 if not self.ready():
653 raise TimeoutError
/opt/conda/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
646
647 def wait(self, timeout=None):
--> 648 self._event.wait(timeout)
649
650 def get(self, timeout=None):
/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
550 signaled = self._flag
551 if not signaled:
--> 552 signaled = self._cond.wait(timeout)
553 return signaled
554
/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
294 try: # restore state no matter what (e.g., KeyboardInterrupt)
295 if timeout is None:
--> 296 waiter.acquire()
297 gotit = True
298 else:
KeyboardInterrupt:
这让我相信它在等待资源?不知道。如果能在这方面提供帮助,我将不胜感激。另外,当我运行这段代码时,我看到在Sagemaker
文件资源管理器中创建了很多cores
。
1条答案
按热度按时间eiee3dmh1#
我不确定此解决方案是否适用于Amazon Sagemaker,但我发现如果将start方法设置为
'spawn'
,则可以避免句子标记器内部的死锁,以增加每个进程之间的隔离。