为什么python多处理脚本运行一段时间后会变慢?

gr8qqesn  于 2023-04-19  发布在  Python
关注(0)|答案(2)|浏览(190)

在发布这个问题之前,我读了一个老问题Why does this python multiprocessing script slow down after a while?和许多其他问题。他们没有回答我的问题。
脚本的想法。该脚本在一个串行循环中生成数组,256 x256。数组的元素是从一个包含相关参数的字典的列表中逐个计算的,每个数组元素一个字典(每个列表总共256 x256)。列表是我启用并行计算的方式。
问题:一开始,数据的生成速度从十几秒加快到几秒,然后,经过几次迭代,它开始减慢几分之一秒,每生成一个新数组,就需要永远计算任何东西。
附加信息。
1.我使用的是pool.map函数。在做了一些小的更改以确定正在计算的元素之后,我还尝试使用map_async。不幸的是,它较慢,因为每次完成计算数组时都需要初始化池。
1.当使用www.example.com时pool.map,我在任何操作开始之前初始化池一次。通过这种方式,我希望与map_async相比,保存初始化池的时间。

  1. CPU显示低使用率,高达~ 18%。
    1.在我的例子中,硬盘驱动器不是瓶颈。所有计算所需的数据都在RAM中。我也不会将数据保存到硬盘驱动器上,将所有数据都保存在RAM中。
    1.我还检查了如果我使用不同数量的核心,2-24,问题是否仍然存在。也没有改变。
    1.我做了一些额外的测试,运行并终止一个池,a.每次生成一个数组,b.每10个数组。我注意到,在每种情况下,代码的执行速度都比前一个池的执行时间慢,即如果前一个池的执行速度慢到5s,另一个池的执行速度将是5.X,依此类推。唯一一次执行速度不慢的是当我串行运行代码时。
    1.工作环境:Windows 10,Python 3.7,conda 4.8.2,Spyder 4.

问题:为什么在只涉及CPU和RAM的情况下,多处理会在一段时间后变慢(没有硬盘驱动器变慢)?任何想法?

更新代码:

import multiprocessing as mp 
from tqdm import tqdm
import numpy as np
import random

def wrapper_(arg):
    return tmp.generate_array_elements(
        self=arg['self'], 
        nu1=arg['nu1'], 
        nu2=arg['nu2'], 
        innt=arg['innt'], 
        nu1exp=arg['nu1exp'], 
        nu2exp=arg['nu2exp'], 
        ii=arg['ii'], 
        jj=arg['jj'],
        llp=arg['self'].llp, 
        rr=arg['self'].rr, 
    )

class tmp:
    def __init__(self, multiprocessing, length, n_of_arrays):
        self.multiprocessing = multiprocessing
        self.inshape = (length,length)
        self.length = length
        self.ll_len = n_of_arrays
        self.num_cpus = 8
        self.maxtasksperchild = 10000
        self.rr = 0
        
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_ll(self, lp): 
        return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]
    
    """original function is different, modified to return something"""
    def get_ip(self): return np.random.random()
    
    """original function is different, modified to return something"""
    def get_op(self): return np.random.random(self.length)
    
    """original function is different, modified to return something"""    
    def get_innt(self, nu1, nu2, ip):
        return nu1*nu2/ip
    
    """original function is different, modified to return something"""    
    def __get_pp(self, nu1):
        return np.exp(nu1)
    
    """dummy function for the example purpose"""
    def dummy_function(self):
        """do important stuff"""
        return 
    
    """dummy function for the example purpose"""
    def dummy_function_2(self, result):
        """do important stuff"""
        return np.reshape(result, np.inshape)
    
    """dummy function for the example purpose"""
    def dummy_function_3(self):
        """do important stuff"""
        return
    
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_llp(self, ll, lp):
        return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]
        
    """NOTE, lp is not used here for the example purpose but
    in the original code, it's very important variable containg
    relevant data for calculations"""
    def generate(self, lp={}):
        """create a list that is used to the creation of 2-D array"""
        """providing here a dummy pp param to get_ll"""
        ll = self.get_ll(lp)
        ip = self.get_ip()
        
        self.op = self.get_op()
        
        """length of args_tmp = self.length * self.length = 256 * 256"""
        args_tmp = [
            {'self': self, 
             'nu1': nu1,  
             'nu2': nu2, 
             'ii': ii, 
             'jj': jj,
             'innt': np.abs(self.get_innt(nu1, nu2, ip)),
             'nu1exp': np.exp(1j*nu1*ip),
             'nu2exp': np.exp(1j*nu2*ip),
             } for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]
        
        pool = {}
        if self.multiprocessing: 
            pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)
        
        """number of arrays is equal to len of ll, here 300"""
        for ll_ in tqdm(ll):
            """Generate data"""
            self.__generate(ll_, lp, pool, args_tmp)
        
        """Create a pool of CPU threads"""
        if self.multiprocessing: 
            pool.terminate()

    def __generate(self, ll, lp, pool = {}, args_tmp = []):
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function()
        
        self.llp = self.get_llp(ll, lp)
        """originally the values is taken from lp"""
        self.rr = self.rr
        
        if self.multiprocessing and pool: 
            result = pool.map(wrapper_, args_tmp)
        else: 
            result = [wrapper_(arg) for arg in args_tmp]
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        result = self.dummy_function_2(result)
    
    """original function is different"""
    def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
        if rr == 1 and self.inshape[0] - 1 - jj < ii: 
            return 0
        elif rr == -1 and ii > jj: 
            return 0
        elif rr == 0:
            """do nothing"""
        
        ll1 = []
        ll2 = []
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function_3()
        
        for kk, ll in enumerate(llp):
            ll1.append(
               self.__get_pp(nu1) * 
               nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
            )
            ll2.append(
               self.__get_pp(nu2) * 
               nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
            )
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result
    
    
    
g = tmp(False, 256, 300)
g.generate()
hjzp0vay

hjzp0vay1#

很难说你的算法中发生了什么。我对多处理不太了解,但坚持使用函数并避免将self传递到池化进程中可能更安全。当你在pool.map()中将args_tmp传递给wrapper_时,就可以做到这一点。总体而言,一般来说,我会尝试减少父子进程之间传递的数据量,我尝试将lp列表的生成移动到池工作进程中,以防止传递过多的数据。
最后,虽然我不认为这在这个示例代码中有什么关系,但你应该在使用pool之后进行清理,或者在with中使用pool。
我重写了你的一些代码来尝试,这似乎更快,但我不是100%,它坚持你的算法。一些变量名称很难区分。
这对我来说运行速度快了很多,但很难判断它是否准确地产生了你的解决方案。如果这是准确的,我的最终结论是,额外的数据传递显着减慢了池工人。

#main.py
if __name__ == '__main__':
    import os
    import sys
    file_dir = os.path.dirname(__file__)
    sys.path.append(file_dir)

    from tmp import generate_1
    parallel = True
    generate_1(parallel)
#tmp.py
import multiprocessing as mp 
import numpy as np
import random
from tqdm import tqdm
from itertools import starmap

def wrapper_(arg):
    return arg['self'].generate_array_elements(
        nu1=arg['nu1'],
        nu2=arg['nu2'],
        ii=arg['ii'],
        jj=arg['jj'],
        lp=arg['self'].lp,
        nu1exp=arg['nu1exp'],
        nu2exp=arg['nu2exp'],
        innt=arg['innt']
    )

def generate_1(parallel):
    """create a list that is used to the creation of 2-D array"""
    il = np.random.random(256)
    """generating params for parallel data generation"""
    """some params are also calculated here to speed up the calculation process
    because they are always the same so they can be calculated just once"""
    """this code creates a list of 256*256 elements"""
    args_tmp = [
    {
     'nu1': nu1,  
     'nu2': nu2, 
     'ii': ii, 
     'jj': jj,
     'innt': np.random.random()*nu1+np.random.random()*nu2,
     'nu1exp': np.exp(1j*nu1),
     'nu2exp': np.exp(1j*nu2),
    } for ii, nu1 in enumerate(il) for jj, nu2 in enumerate(il)]

    """init pool"""
    

    """get list of arrays to generate"""
    ip_list = [random.sample((range(256)),int(np.random.random()*12)+1) for ii in range(300)]

    map_args = [(idx, ip, args_tmp) for idx, ip in enumerate(ip_list)]
    """separate function to do other important things"""
    if parallel:
        with mp.Pool(8, maxtasksperchild=10000) as pool:
            result = pool.starmap(start_generate_2, map_args)
    else:
        result = starmap(start_generate_2, map_args)
    # Wrap iterator in list call.
    return list(result)

def start_generate_2(idx, ip, args_tmp):
    print ('starting {idx}'.format(idx=idx))
    runner = Runner()
    result = runner.generate_2(ip, args_tmp)
    print ('finished {idx}'.format(idx=idx))
    return result

class Runner():

    def generate_2(self, ip, args_tmp):
        """NOTE, the method is much more extensive and uses other methods of the class""" 
        """so it must remain a method of the class that is not static!"""
        self.lp = [{'a': np.random.random(), 'b': np.random.random()} for ii in ip]
        """this part creates 1-D array of the length of args_tmp, that's 256*256"""
        result = map(wrapper_, [dict(args, self=self) for args in args_tmp])
        """it's then reshaped to 2-D array"""
        result = np.reshape(list(result), (256,256))
        return result
    
    def generate_array_elements(self, nu1, nu2, ii, jj, lp, nu1exp, nu2exp, innt):
        """doing heavy calc"""
        """"here is something else"""
        if ii > jj: return 0
            
        ll1 = []
        ll2 = []
        for kk, ll in enumerate(lp):
            ll1.append(nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random()))
            ll2.append(nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random()))
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result

我将添加一个通用模板来展示一个架构,在该架构中,您可以将共享参数的准备工作从任务运行器中分离出来,并且仍然使用类(300似乎比试图将它们拆分为64000更快),并且不要向每个任务传递太多数据。launch_task的界面应尽可能保持简单,在我对代码的重构中,它相当于start_generate_2

import multiprocessing
from itertools import starmap

class Launcher():
    def __init__(self, parallel):
        self.parallel = parallel

    def generate_shared_args(self):
        return [(i, j) for i, j in enumerate(range(300))]

    def launch(self):
        shared_args = self.generate_shared_args()
        if self.parallel:
            with multiprocessing.Pool(8) as pool:
                result = pool.starmap(launch_task, shared_args)
        else:
            result = starmap(launch_task, shared_args)
        # Wrap in list to resolve iterable.
        return list(result)

def launch_task(i, j):
    task = Task(i, j)
    return task.run()

class Task():

    def __init__(self, i, j):
        self.i = i
        self.j = j

    def run(self):
        return self.i + self.j

if __name__ == '__main__':
    parallel = True
    launcher = Launcher(parallel)
    print(launcher.launch())

此处的池文档中有一条有关池清理的警告:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
第一项讨论避免共享状态,特别是避免大量数据.https://docs.python.org/3/library/multiprocessing.html#programming-guidelines

sh7euo9m

sh7euo9m2#

伊恩·威尔逊的建议非常有用,其中一个帮助解决了这个问题。这就是为什么他的答案被标记为正确答案的原因。
正如他所建议的,最好在较少的任务上调用pool。pool.map(N),它为每个数组元素创建了256256次(总共N256*256个任务),现在我在计算整个数组的函数上调用pool.map,所以只有N次。函数内部的数组计算是以序列化的方式完成的。
我仍然将self作为参数发送,因为函数中需要它,但它对性能没有任何影响。
这个小小的变化将数组的计算速度从7- 15 s提高到1.5it/s-2s/it!
当前代码:

import multiprocessing as mp 
import tqdm
import numpy as np
import random

def wrapper_(arg):
    return tmp.generate_array_elements(
        self=arg['self'], 
        nu1=arg['nu1'], 
        nu2=arg['nu2'], 
        innt=arg['innt'], 
        nu1exp=arg['nu1exp'], 
        nu2exp=arg['nu2exp'], 
        ii=arg['ii'], 
        jj=arg['jj'],
        llp=arg['self'].llp, 
        rr=arg['self'].rr, 
    )

"""NEW WRAPPER HERE"""
"""Sending self doesn't have bad impact on the performance, at least I don't complain :)"""
def generate(arg):
   tmp._tmp__generate(arg['self'], arg['ll'], arg['lp'], arg['pool'], arg['args_tmp'])

class tmp:
    def __init__(self, multiprocessing, length, n_of_arrays):
        self.multiprocessing = multiprocessing
        self.inshape = (length,length)
        self.length = length
        self.ll_len = n_of_arrays
        self.num_cpus = 8
        self.maxtasksperchild = 10000
        self.rr = 0
        
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_ll(self, lp): 
        return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]
    
    """original function is different, modified to return something"""
    def get_ip(self): return np.random.random()
    
    """original function is different, modified to return something"""
    def get_op(self): return np.random.random(self.length)
    
    """original function is different, modified to return something"""    
    def get_innt(self, nu1, nu2, ip):
        return nu1*nu2/ip
    
    """original function is different, modified to return something"""    
    def __get_pp(self, nu1):
        return np.exp(nu1)
    
    """dummy function for the example purpose"""
    def dummy_function(self):
        """do important stuff"""
        return 
    
    """dummy function for the example purpose"""
    def dummy_function_2(self, result):
        """do important stuff"""
        return np.reshape(result, np.inshape)
    
    """dummy function for the example purpose"""
    def dummy_function_3(self):
        """do important stuff"""
        return
    
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_llp(self, ll, lp):
        return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]
        
    """NOTE, lp is not used here for the example purpose but
    in the original code, it's very important variable containg
    relevant data for calculations"""
    def generate(self, lp={}):
        """create a list that is used to the creation of 2-D array"""
        """providing here a dummy pp param to get_ll"""
        ll = self.get_ll(lp)
        ip = self.get_ip()
        
        self.op = self.get_op()
        
        """length of args_tmp = self.length * self.length = 256 * 256"""
        args_tmp = [
            {'self': self, 
             'nu1': nu1,  
             'nu2': nu2, 
             'ii': ii, 
             'jj': jj,
             'innt': np.abs(self.get_innt(nu1, nu2, ip)),
             'nu1exp': np.exp(1j*nu1*ip),
             'nu2exp': np.exp(1j*nu2*ip),
             } for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]
        
        pool = {}
        
        """MAJOR CHANGE IN THIS PART AND BELOW"""
        map_args = [{'self': self, 'idx': (idx, len(ll)), 'll': ll, 'lp': lp, 'pool': pool, 'args_tmp': args_tmp} for idx, ll in enumerate(ll)]

        if self.multiprocessing: 
            pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)
            
            for _ in tqdm.tqdm(pool.imap_unordered(generate_js_, map_args), total=len(map_args)):
                pass
            pool.close()
            pool.join()
            pbar.close()
        else:
            for map_arg in tqdm.tqdm(map_args):
                generate_js_(map_arg)

    def __generate(self, ll, lp, pool = {}, args_tmp = []):
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function()
        
        self.llp = self.get_llp(ll, lp)
        """originally the values is taken from lp"""
        self.rr = self.rr
        
        """REMOVED PARALLEL CALL HERE"""
        result = [wrapper_(arg) for arg in args_tmp]
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        result = self.dummy_function_2(result)
    
    """original function is different"""
    def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
        if rr == 1 and self.inshape[0] - 1 - jj < ii: 
            return 0
        elif rr == -1 and ii > jj: 
            return 0
        elif rr == 0:
            """do nothing"""
        
        ll1 = []
        ll2 = []
        
        """In the original code, there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function_3()
        
        for kk, ll in enumerate(llp):
            ll1.append(
               self.__get_pp(nu1) * 
               nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
            )
            ll2.append(
               self.__get_pp(nu2) * 
               nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
            )
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result
    
    
    
g = tmp(False, 256, 300)
g.generate()

再次感谢伊恩。

相关问题