在发布这个问题之前,我读了一个老问题Why does this python multiprocessing script slow down after a while?和许多其他问题。他们没有回答我的问题。
脚本的想法。该脚本在一个串行循环中生成数组,256 x256。数组的元素是从一个包含相关参数的字典的列表中逐个计算的,每个数组元素一个字典(每个列表总共256 x256)。列表是我启用并行计算的方式。
问题:一开始,数据的生成速度从十几秒加快到几秒,然后,经过几次迭代,它开始减慢几分之一秒,每生成一个新数组,就需要永远计算任何东西。
附加信息。
1.我使用的是pool.map函数。在做了一些小的更改以确定正在计算的元素之后,我还尝试使用map_async。不幸的是,它较慢,因为每次完成计算数组时都需要初始化池。
1.当使用www.example.com时pool.map,我在任何操作开始之前初始化池一次。通过这种方式,我希望与map_async相比,保存初始化池的时间。
- CPU显示低使用率,高达~ 18%。
1.在我的例子中,硬盘驱动器不是瓶颈。所有计算所需的数据都在RAM中。我也不会将数据保存到硬盘驱动器上,将所有数据都保存在RAM中。
1.我还检查了如果我使用不同数量的核心,2-24,问题是否仍然存在。也没有改变。
1.我做了一些额外的测试,运行并终止一个池,a.每次生成一个数组,b.每10个数组。我注意到,在每种情况下,代码的执行速度都比前一个池的执行时间慢,即如果前一个池的执行速度慢到5s,另一个池的执行速度将是5.X,依此类推。唯一一次执行速度不慢的是当我串行运行代码时。
1.工作环境:Windows 10,Python 3.7,conda 4.8.2,Spyder 4.
问题:为什么在只涉及CPU和RAM的情况下,多处理会在一段时间后变慢(没有硬盘驱动器变慢)?任何想法?
更新代码:
import multiprocessing as mp
from tqdm import tqdm
import numpy as np
import random
def wrapper_(arg):
return tmp.generate_array_elements(
self=arg['self'],
nu1=arg['nu1'],
nu2=arg['nu2'],
innt=arg['innt'],
nu1exp=arg['nu1exp'],
nu2exp=arg['nu2exp'],
ii=arg['ii'],
jj=arg['jj'],
llp=arg['self'].llp,
rr=arg['self'].rr,
)
class tmp:
def __init__(self, multiprocessing, length, n_of_arrays):
self.multiprocessing = multiprocessing
self.inshape = (length,length)
self.length = length
self.ll_len = n_of_arrays
self.num_cpus = 8
self.maxtasksperchild = 10000
self.rr = 0
"""original function is different, modified to return something"""
"""for the example purpose, lp is not relevant here but in general is"""
def get_ll(self, lp):
return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]
"""original function is different, modified to return something"""
def get_ip(self): return np.random.random()
"""original function is different, modified to return something"""
def get_op(self): return np.random.random(self.length)
"""original function is different, modified to return something"""
def get_innt(self, nu1, nu2, ip):
return nu1*nu2/ip
"""original function is different, modified to return something"""
def __get_pp(self, nu1):
return np.exp(nu1)
"""dummy function for the example purpose"""
def dummy_function(self):
"""do important stuff"""
return
"""dummy function for the example purpose"""
def dummy_function_2(self, result):
"""do important stuff"""
return np.reshape(result, np.inshape)
"""dummy function for the example purpose"""
def dummy_function_3(self):
"""do important stuff"""
return
"""original function is different, modified to return something"""
"""for the example purpose, lp is not relevant here but in general is"""
def get_llp(self, ll, lp):
return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]
"""NOTE, lp is not used here for the example purpose but
in the original code, it's very important variable containg
relevant data for calculations"""
def generate(self, lp={}):
"""create a list that is used to the creation of 2-D array"""
"""providing here a dummy pp param to get_ll"""
ll = self.get_ll(lp)
ip = self.get_ip()
self.op = self.get_op()
"""length of args_tmp = self.length * self.length = 256 * 256"""
args_tmp = [
{'self': self,
'nu1': nu1,
'nu2': nu2,
'ii': ii,
'jj': jj,
'innt': np.abs(self.get_innt(nu1, nu2, ip)),
'nu1exp': np.exp(1j*nu1*ip),
'nu2exp': np.exp(1j*nu2*ip),
} for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]
pool = {}
if self.multiprocessing:
pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)
"""number of arrays is equal to len of ll, here 300"""
for ll_ in tqdm(ll):
"""Generate data"""
self.__generate(ll_, lp, pool, args_tmp)
"""Create a pool of CPU threads"""
if self.multiprocessing:
pool.terminate()
def __generate(self, ll, lp, pool = {}, args_tmp = []):
"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
self.dummy_function()
self.llp = self.get_llp(ll, lp)
"""originally the values is taken from lp"""
self.rr = self.rr
if self.multiprocessing and pool:
result = pool.map(wrapper_, args_tmp)
else:
result = [wrapper_(arg) for arg in args_tmp]
"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
result = self.dummy_function_2(result)
"""original function is different"""
def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
if rr == 1 and self.inshape[0] - 1 - jj < ii:
return 0
elif rr == -1 and ii > jj:
return 0
elif rr == 0:
"""do nothing"""
ll1 = []
ll2 = []
"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
self.dummy_function_3()
for kk, ll in enumerate(llp):
ll1.append(
self.__get_pp(nu1) *
nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
)
ll2.append(
self.__get_pp(nu2) *
nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
)
t1 = sum(ll1)
t2 = sum(ll2)
result = innt*np.abs(t1 - t2)
return result
g = tmp(False, 256, 300)
g.generate()
2条答案
按热度按时间hjzp0vay1#
很难说你的算法中发生了什么。我对多处理不太了解,但坚持使用函数并避免将self传递到池化进程中可能更安全。当你在
pool.map()
中将args_tmp
传递给wrapper_
时,就可以做到这一点。总体而言,一般来说,我会尝试减少父子进程之间传递的数据量,我尝试将lp
列表的生成移动到池工作进程中,以防止传递过多的数据。最后,虽然我不认为这在这个示例代码中有什么关系,但你应该在使用pool之后进行清理,或者在
with
中使用pool。我重写了你的一些代码来尝试,这似乎更快,但我不是100%,它坚持你的算法。一些变量名称很难区分。
这对我来说运行速度快了很多,但很难判断它是否准确地产生了你的解决方案。如果这是准确的,我的最终结论是,额外的数据传递显着减慢了池工人。
我将添加一个通用模板来展示一个架构,在该架构中,您可以将共享参数的准备工作从任务运行器中分离出来,并且仍然使用类(300似乎比试图将它们拆分为64000更快),并且不要向每个任务传递太多数据。launch_task的界面应尽可能保持简单,在我对代码的重构中,它相当于
start_generate_2
。此处的池文档中有一条有关池清理的警告:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
第一项讨论避免共享状态,特别是避免大量数据.https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
sh7euo9m2#
伊恩·威尔逊的建议非常有用,其中一个帮助解决了这个问题。这就是为什么他的答案被标记为正确答案的原因。
正如他所建议的,最好在较少的任务上调用pool。pool.map(N),它为每个数组元素创建了256256次(总共N256*256个任务),现在我在计算整个数组的函数上调用pool.map,所以只有N次。函数内部的数组计算是以序列化的方式完成的。
我仍然将self作为参数发送,因为函数中需要它,但它对性能没有任何影响。
这个小小的变化将数组的计算速度从7- 15 s提高到1.5it/s-2s/it!
当前代码:
再次感谢伊恩。