python-3.x 如何重用多处理池?

gijlo24d  于 2023-01-22  发布在  Python
关注(0)|答案(1)|浏览(130)

底部是我现在的代码。它看起来工作得很好。但是,我并不完全理解它。我想如果没有.join(),我会冒代码在池完成执行之前进入下一个for循环的风险。我们不需要这3行注解掉的代码吗?
另一方面,如果我选择.close().join()的方式,有没有办法“重新打开”那个关闭的游泳池,而不是每次都使用Pool(6)

import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time

def mesh_subset(population, n_chosen=5):
    chosen = rdm.choices(population, k=n_chosen)
    return mean(chosen)

if __name__ == '__main__':
    population = [x for x in range(20)]
    N_iteration = 10
    start_time = time.time()
    pool = mp.Pool(6)
    for i in range(N_iteration):
        print([round(x,2) for x in population])
        print(stdev(population))
        # pool = mp.Pool(6)
        population = pool.map(mesh_subset, [population]*len(population))
        # pool.close()
        # pool.join()
    print('run time:', time.time() - start_time)
wrrgggsh

wrrgggsh1#

工人池的设置成本相对较高,因此(如果可能的话)应该只设置一次,通常是在脚本的开头。
pool.map命令阻塞,直到所有任务都完成为止。毕竟,它返回结果列表。除非mesh_subset在所有输入上都被调用并为每个输入返回结果,否则它不能这样做。pool.apply_async之类的方法不会阻塞。apply_async返回一个ApplyResult对象,该对象带有get方法,该方法会阻塞,直到从工作进程获得结果。
pool.close将工作者处理程序的状态设置为CLOSE。这将导致处理程序向工作者发出终止信号。
pool.join会一直阻塞,直到所有工作进程都终止为止。
因此,在完成池之前,您不需要调用--实际上不应该调用--pool.closepool.join。一旦向工作线程发送了终止信号(通过pool.close),就无法“重新打开”它们。您需要启动一个新的池。
在您的情况下,由于您 * 确实 * 希望循环等待所有任务完成,因此使用pool.apply_async代替pool.map没有任何优势,但是如果您使用pool.apply_async,则可以通过调用get而不是关闭并重新启动池来获得与之前相同的结果:

# you could do this, but using pool.map is simpler
for i in range(N_iteration):
    apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
    # the call to result.get() blocks until its worker process (running
    # mesh_subset) returns a value
    population = [result.get() for result in apply_results]

循环完成后,len(population)保持不变。
如果您不希望在所有任务完成之前阻塞每个循环,可以使用apply_asynccallback功能:

N_pop = len(population)
result = []
for i in range(N_iteration):
    for i in range(N_pop):
        pool.apply_async(mesh_subset, [population]),
                         callback=result.append)
pool.close()
pool.join()
print(result)

现在,当任何mesh_subset返回return_value时,将调用result.append(return_value)。对apply_async的调用不会阻塞,因此N_iteration * N_pop任务会一次全部推入pool的任务队列中。但由于池中有6个工作线程,因此在任何给定时间最多只能运行6个对mesh_subset的调用。当工作线程完成任务时,无论哪个工作线程先调用result.append(return_value)result中的值都是无序的,这与pool.map不同,pool.map返回一个列表,其返回值的顺序与其对应的参数列表的顺序相同。
除非出现异常,result最终将包含N_iteration * N_pop返回值 * 一旦所有任务完成 *。上面,pool.close()pool.join()用于等待所有任务完成。

相关问题