我对Python中的多处理有一个问题,我完全被难住了。我正在编写的脚本的大纲如下:
1.创建队列以在进程之间传递数据
1.创建一个TF-IDF矢量器并在一些文本上训练它
1.创建一些等待队列输入的辅助进程
1.将一些数据传递到队列以使用余弦相似性查找相似数据点
1.等待所有进程发送完成信号,然后退出
代码如下所示,我遇到的问题是,在执行过程中的某个时刻,“queue_in”对象被替换为稀疏矩阵。我对这是如何发生的感到困惑,因为我从未显式重写queue_in的值。我遇到的第二个问题是,在执行过程中,输出“Hello from process {os.getpid”的行()}”被多次执行。我以为一旦子进程开始执行函数,它就会处于无限循环中,直到它收到来自queue_in的“毒丸”。
我已经在谷歌上搜索过了,没有发现任何问题。我对多处理也是一个新手,所以任何帮助我理解为什么queue_in对象被其他对象替换的方法都会非常有帮助。
堆栈跟踪位于代码下方。
import pandas as pd
import numpy as np
import os
from sklearn.metrics.pairwise import cosine_similarity
import multiprocessing as mp
from tqdm.notebook import tqdm
from sklearn.feature_extraction.text import TfidfVectorizer as TFIDF
from nltk.stem import WordNetLemmatizer
import time
model_dir = '/home/hma94128/bert_base_uncased/'
udata_dir = '/home/hma94128/synthetic_data/unlabeled_data.csv/unlabeled.csv'
rdata_dir = '/home/hma94128/training_data_and_labels_no_other.csv'
chunksize=100
workers = 5
row_limit = 200
cutoff=.8
def stream_text():
'''Streams single line of text for training the TFIDF vectorizer'''
lemmatizer = WordNetLemmatizer()
with pd.read_csv(rdata_dir, chunksize=1) as reader:
for chunk in reader:
text = chunk['text'].tolist()[0]
text = [lemmatizer.lemmatize(x) for x in text.split(' ')]
text = ' '.join(text)
yield text
#TODO: function to calculate cosine similarity and capture rows greater than cutoff, then write to file
def cosine_similarity(queue_in,
queue_out,
cutoff=cutoff,):
'''Funcion that iterates over the reference dataframe
to identify similar datapoints in unlabeled_data'''
print(f'Hello from process {os.getpid()}')
while True:
rows_written = 0
data_package = queue_in.get()
print('Got chunk!')
#if poison pill comes through then terminate
if data_package == None:
queue_out.put(1)
break
row = data_package[0]
unlabeled_data = data_package[1]
for idx in row.index:
#get text data
text_data = row.at[idx,'text']
#get labels
labels = row.iloc[idx:idx+1,3:]
record_id = row.at[idx,'record_id']
#transform text to vector
print('Transforming text')
reference_vector = tfidf.transform([text_data])
#make list of text to pass to model
unlabeled_text = unlabeled_data['text_data'].tolist()
unlabeled_vectors = tfidf.transform(unlabeled_text)
#calculate cosine similiarity
similarity_matrix = cosine_similarity(unlabeled_vectors, reference_vector)
cidx = np.nonzero(similarity_matrix>cutoff)[0]
if cidx.shape[0]>0:
#increment number of rows written
rows_written += cidx.shape[0]
print(f'Found {similarity_matrix[np.nonzero(similarity_matrix>cutoff)[0]].shape[0]}!')
#determine where the CS score is higher than cutoff
#np.nonzero()[0] returns the indices where the criteria is met and get text
close_enough = unlabeled_data[['text_data']].iloc[cidx,:]
#make block of labels to concatenate
labels = pd.concat([labels for x in range(close_enough.shape[0])])
close_enough['record_id'] = record_id
cloe_enough['source'] = 'synthetic'
close_enough['cosine_sim'] = similarity_matrix[cidx]
close_enough = close_enough.rename(columns={'text_data':'text'})
output = pd.concat([close_enough, labels], axis=1)
#if data already written then no header
if rows_written > 0:
output.to_csv(f'synthetic_data_output_{os.getpid()}.csv',index=False, header=False, mode='a')
else:
output.to_csv(f'synthetic_data_output_{os.getpid()}.csv',index=False, mode='a')
print('Waiting to work on new block!')
else:
print(f'{os.getpid()} found no rows :( moving on!')
if __name__ == '__main__':
#make queues
print('Making queues')
queue_in = mp.Queue()
queue_out = mp.Queue()
#make tfidf vectorizer
print('Making TFIDF vectorizer')
tfidf = TFIDF(stop_words='english', ngram_range=(1,1), max_df=.9, max_features=1000)
stream = stream_text()
tfidf = tfidf.fit(stream)
#start worker processes
print('Starting workers')
processes = []
for n in range(workers):
p = mp.Process(target=cosine_similarity, args=(queue_in, queue_out))
processes.append(p)
p.start()
print('Reading in data to iterate over rows...')
rdata = pd.read_csv(rdata_dir)
print('Feeding jobs')
for idx in tqdm(range(rdata.shape[0])):
row_count = 0
row = rdata.iloc[idx:idx+1,:]
with pd.read_csv(udata_dir, on_bad_lines='skip', chunksize=chunksize) as reader:
for chunk in reader:
#don't overload it
while queue_in.qsize()>20:
time.sleep(1)
data_block = (row, chunk)
queue_in.put(data_block)
finished = 0
print('Waiting for cleanup')
while finsihed < workers:
output = queue_out.get()
finished += output
print('Done!')
enter image description here
我试着用谷歌搜索为什么queue_in对象可以被替换,但没有成功。我也试着使用mp.Manager(),然后用它来创建队列manageer.Queue(),但问题仍然存在。
1条答案
按热度按时间biswetbf1#
很明显你有两个函数
cosine_similarity
1.一个从
sklearn.metrics.pairwise
导入1.另一个由您自己定义并用于启动Process的变量
由于第二个是稍后定义的,因此它将覆盖第一个。
因此,当您在新Process中调用
cosine_similarity
时,您实际上调用了自己定义的cosine_similarity
函数,而不是导入的函数。因此解决方案很简单,只需将自己的
cosine_similarity
重命名为其他名称,例如calculate_cosine_similarity
。