python-3.x www.example. com ()不能在2个以上的CPU上工作

6vl6ewon  于 2023-05-02  发布在  Python
关注(0)|答案(1)|浏览(92)

我有以下代码:

import sentence_transformers
import multiprocessing
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np

embedding_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

data = [[100227, 7382501.0, 'view', 30065006, False, ''],
 [100227, 7382501.0, 'view', 57072062, True, ''],
 [100227, 7382501.0, 'view', 66405922, True, ''],
 [100227, 7382501.0, 'view', 5221475, False, ''],
 [100227, 7382501.0, 'view', 63283995, True, '']]

df_text = dict()
df_text[7382501] = {'title': 'The Geography of the Internet Industry, Venture Capital, Dot-coms, and Local Knowledge - MATTHEW A. ZOOK', 'abstract': '23', 'highlight': '12'}
df_text[30065006] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[57072062] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[66405922] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[5221475] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[63283995] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}

# Define the function to be executed in parallel
def process_data(chunk):
    results = []
    for row in chunk:
        print(row[0])
        work_id = row[1]
        mentioning_work_id = row[3]
        print(work_id)

        if work_id in df_text and mentioning_work_id in df_text:
            title1 = df_text[work_id]['title']
            title2 = df_text[mentioning_work_id]['title']
            embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
            embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)
            
            similarity = np.matmul(embeddings_title1, embeddings_title2.T)
            
            results.append([row[0],row[1],row[2],row[3],row[4],similarity])
        else:
            continue
    return results

# Define the number of CPU cores to use
num_cores = multiprocessing.cpu_count()

# Split the data into chunks
chunk_size = len(data) // num_cores
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

# Create a pool of worker processest
pool = multiprocessing.Pool(processes=num_cores)

results = []
with tqdm(total=len(data)) as pbar:
    for i, result_chunk in enumerate(pool.map(process_data, chunks)):
        # Update the progress bar
        pbar.update()
        # Add the results to the list
        results += result_chunk

# Concatenate the results
final_result = results

我在Amazon Sagemaker上运行这段代码,它在一个有2个CPU的示例上运行得很好。它给我进度条和一切。但是我想在一个有更多CPU的更大的示例上运行它。但是它只是挂在更多的CPU上,根本没有进步。当我最终停止内核时,我得到这个错误:

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-18-19449c86abd3> in <module>
      1 results = []
      2 with tqdm(total=len(chunks)) as pbar:
----> 3     for i, result_chunk in enumerate(pool.map(process_data, chunks)):
      4         # Update the progress bar
      5         pbar.update()

/opt/conda/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    649 
    650     def get(self, timeout=None):
--> 651         self.wait(timeout)
    652         if not self.ready():
    653             raise TimeoutError

/opt/conda/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
    646 
    647     def wait(self, timeout=None):
--> 648         self._event.wait(timeout)
    649 
    650     def get(self, timeout=None):

/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
    550             signaled = self._flag
    551             if not signaled:
--> 552                 signaled = self._cond.wait(timeout)
    553             return signaled
    554 

/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
    294         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    295             if timeout is None:
--> 296                 waiter.acquire()
    297                 gotit = True
    298             else:

KeyboardInterrupt:

这让我相信它在等待资源?不知道。如果能在这方面提供帮助,我将不胜感激。另外,当我运行这段代码时,我看到在Sagemaker文件资源管理器中创建了很多cores

eiee3dmh

eiee3dmh1#

我不确定此解决方案是否适用于Amazon Sagemaker,但我发现如果将start方法设置为'spawn',则可以避免句子标记器内部的死锁,以增加每个进程之间的隔离。

import sentence_transformers
import multiprocessing
from tqdm import tqdm
import numpy as np
from multiprocessing import Pool
import multiprocessing

embedding_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

data = [[100227, 7382501.0, 'view', 30065006, False, ''],
 [100227, 7382501.0, 'view', 57072062, True, ''],
 [100227, 7382501.0, 'view', 66405922, True, ''],
 [100227, 7382501.0, 'view', 5221475, False, ''],
 [100227, 7382501.0, 'view', 63283995, True, '']]

df_text = dict()
df_text[7382501] = {'title': 'The Geography of the Internet Industry, Venture Capital, Dot-coms, and Local Knowledge - MATTHEW A. ZOOK', 'abstract': '23', 'highlight': '12'}
df_text[30065006] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[57072062] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[66405922] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[5221475] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[63283995] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}

# Define the function to be executed in parallel
def process_data(chunk):
    results = []
    for row in chunk:
        print(row[0])
        work_id = row[1]
        mentioning_work_id = row[3]
        print(work_id)

        if work_id in df_text and mentioning_work_id in df_text:
            title1 = df_text[work_id]['title']
            title2 = df_text[mentioning_work_id]['title']
            embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
            embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)

            similarity = np.matmul(embeddings_title1, embeddings_title2.T)

            results.append([row[0],row[1],row[2],row[3],row[4],similarity])
        else:
            continue
    return results

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    # Define the number of CPU cores to use
    # num_cores = multiprocessing.cpu_count()
    num_cores = 4

    # Split the data into chunks
    chunk_size = len(data) // num_cores
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

    # Create a pool of worker processest
    # pool =

    results = []
    with multiprocessing.Pool(processes=num_cores) as pool:
        with tqdm(total=len(data)) as pbar:
            for i, result_chunk in enumerate(pool.map(process_data, chunks)):
                # Update the progress bar
                pbar.update(len(result_chunk))
                # Add the results to the list
                results += result_chunk

    # Concatenate the results
    final_result = results
    print(final_result)

相关问题