在Python多处理中覆盖对象

hts6caw3  于 2023-03-13  发布在  Python
关注(0)|答案(1)|浏览(141)

我对Python中的多处理有一个问题,我完全被难住了。我正在编写的脚本的大纲如下:
1.创建队列以在进程之间传递数据
1.创建一个TF-IDF矢量器并在一些文本上训练它
1.创建一些等待队列输入的辅助进程
1.将一些数据传递到队列以使用余弦相似性查找相似数据点
1.等待所有进程发送完成信号,然后退出
代码如下所示,我遇到的问题是,在执行过程中的某个时刻,“queue_in”对象被替换为稀疏矩阵。我对这是如何发生的感到困惑,因为我从未显式重写queue_in的值。我遇到的第二个问题是,在执行过程中,输出“Hello from process {os.getpid”的行()}”被多次执行。我以为一旦子进程开始执行函数,它就会处于无限循环中,直到它收到来自queue_in的“毒丸”。
我已经在谷歌上搜索过了,没有发现任何问题。我对多处理也是一个新手,所以任何帮助我理解为什么queue_in对象被其他对象替换的方法都会非常有帮助。
堆栈跟踪位于代码下方。

import pandas as pd
import numpy as np
import os
from sklearn.metrics.pairwise import cosine_similarity
import multiprocessing as mp
from tqdm.notebook import tqdm
from sklearn.feature_extraction.text import TfidfVectorizer as TFIDF
from nltk.stem import WordNetLemmatizer
import time

model_dir = '/home/hma94128/bert_base_uncased/'
udata_dir = '/home/hma94128/synthetic_data/unlabeled_data.csv/unlabeled.csv'
rdata_dir = '/home/hma94128/training_data_and_labels_no_other.csv'
chunksize=100
workers = 5
row_limit = 200
cutoff=.8

def stream_text():
    '''Streams single line of text for training the TFIDF vectorizer'''
    lemmatizer = WordNetLemmatizer()
    with pd.read_csv(rdata_dir, chunksize=1) as reader:
        for chunk in reader:
            text = chunk['text'].tolist()[0]
            text = [lemmatizer.lemmatize(x) for x in text.split(' ')]
            text = ' '.join(text)
            yield text
            
#TODO: function to calculate cosine similarity and capture rows greater than cutoff, then write to file
def cosine_similarity(queue_in,
                      queue_out,
                      cutoff=cutoff,):
    '''Funcion that iterates over the reference dataframe
    to identify similar datapoints in unlabeled_data'''
    print(f'Hello from process {os.getpid()}')
    while True:
        rows_written = 0
        data_package = queue_in.get()
        print('Got chunk!')
        #if poison pill comes through then terminate
        if data_package == None:
            queue_out.put(1)
            break
        row = data_package[0]
        unlabeled_data = data_package[1]
        for idx in row.index:
            #get text data
            text_data = row.at[idx,'text']
            #get labels
            labels = row.iloc[idx:idx+1,3:]
            record_id = row.at[idx,'record_id']
            #transform text to vector
            print('Transforming text')
            reference_vector = tfidf.transform([text_data])
            #make list of text to pass to model
            unlabeled_text = unlabeled_data['text_data'].tolist()
            unlabeled_vectors = tfidf.transform(unlabeled_text)
            #calculate cosine similiarity
            similarity_matrix = cosine_similarity(unlabeled_vectors, reference_vector)
            cidx = np.nonzero(similarity_matrix>cutoff)[0]
            if cidx.shape[0]>0:
                #increment number of rows written
                rows_written += cidx.shape[0]
                print(f'Found {similarity_matrix[np.nonzero(similarity_matrix>cutoff)[0]].shape[0]}!')
                #determine where the CS score is higher than cutoff
                #np.nonzero()[0] returns the indices where the criteria is met and get text
                close_enough = unlabeled_data[['text_data']].iloc[cidx,:]

                #make block of labels to concatenate
                labels = pd.concat([labels for x in range(close_enough.shape[0])])
                close_enough['record_id'] = record_id
                cloe_enough['source'] = 'synthetic'
                close_enough['cosine_sim'] = similarity_matrix[cidx]
                close_enough = close_enough.rename(columns={'text_data':'text'})
                output = pd.concat([close_enough, labels], axis=1)
                
                #if data already written then no header
                if rows_written > 0:
                    output.to_csv(f'synthetic_data_output_{os.getpid()}.csv',index=False, header=False, mode='a')
                else:
                    output.to_csv(f'synthetic_data_output_{os.getpid()}.csv',index=False, mode='a')
                print('Waiting to work on new block!')
            else:
                print(f'{os.getpid()} found no rows :( moving on!')
                

if __name__ == '__main__':
    #make queues
    print('Making queues')
    queue_in = mp.Queue()
    queue_out = mp.Queue()

    #make tfidf vectorizer
    print('Making TFIDF vectorizer')
    tfidf = TFIDF(stop_words='english', ngram_range=(1,1), max_df=.9, max_features=1000)
    stream = stream_text()
    tfidf = tfidf.fit(stream)

    #start worker processes
    print('Starting workers')
    processes = []
    for n in range(workers):
        p = mp.Process(target=cosine_similarity, args=(queue_in, queue_out))
        processes.append(p)
        p.start()

    print('Reading in data to iterate over rows...')
    rdata = pd.read_csv(rdata_dir)
    print('Feeding jobs')
    for idx in tqdm(range(rdata.shape[0])):
        row_count = 0
        row = rdata.iloc[idx:idx+1,:]
        with pd.read_csv(udata_dir, on_bad_lines='skip', chunksize=chunksize) as reader:
            for chunk in reader:
                #don't overload it
                while queue_in.qsize()>20:
                    time.sleep(1)
                data_block = (row, chunk)
                queue_in.put(data_block)

    finished = 0
    print('Waiting for cleanup')
    while finsihed < workers:
        output = queue_out.get()
        finished += output
    print('Done!')

enter image description here
我试着用谷歌搜索为什么queue_in对象可以被替换,但没有成功。我也试着使用mp.Manager(),然后用它来创建队列manageer.Queue(),但问题仍然存在。

biswetbf

biswetbf1#

很明显你有两个函数cosine_similarity
1.一个从sklearn.metrics.pairwise导入
1.另一个由您自己定义并用于启动Process的变量
由于第二个是稍后定义的,因此它将覆盖第一个。
因此,当您在新Process中调用cosine_similarity时,您实际上调用了自己定义的cosine_similarity函数,而不是导入的函数。
因此解决方案很简单,只需将自己的cosine_similarity重命名为其他名称,例如calculate_cosine_similarity

相关问题