底部是我现在的代码。它看起来工作得很好。但是,我并不完全理解它。我想如果没有.join()
,我会冒代码在池完成执行之前进入下一个for循环的风险。我们不需要这3行注解掉的代码吗?
另一方面,如果我选择.close()
和.join()
的方式,有没有办法“重新打开”那个关闭的游泳池,而不是每次都使用Pool(6)
?
import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time
def mesh_subset(population, n_chosen=5):
chosen = rdm.choices(population, k=n_chosen)
return mean(chosen)
if __name__ == '__main__':
population = [x for x in range(20)]
N_iteration = 10
start_time = time.time()
pool = mp.Pool(6)
for i in range(N_iteration):
print([round(x,2) for x in population])
print(stdev(population))
# pool = mp.Pool(6)
population = pool.map(mesh_subset, [population]*len(population))
# pool.close()
# pool.join()
print('run time:', time.time() - start_time)
1条答案
按热度按时间wrrgggsh1#
工人池的设置成本相对较高,因此(如果可能的话)应该只设置一次,通常是在脚本的开头。
pool.map
命令阻塞,直到所有任务都完成为止。毕竟,它返回结果列表。除非mesh_subset
在所有输入上都被调用并为每个输入返回结果,否则它不能这样做。pool.apply_async
之类的方法不会阻塞。apply_async
返回一个ApplyResult对象,该对象带有get
方法,该方法会阻塞,直到从工作进程获得结果。pool.close
将工作者处理程序的状态设置为CLOSE。这将导致处理程序向工作者发出终止信号。pool.join
会一直阻塞,直到所有工作进程都终止为止。因此,在完成池之前,您不需要调用--实际上不应该调用--
pool.close
和pool.join
。一旦向工作线程发送了终止信号(通过pool.close
),就无法“重新打开”它们。您需要启动一个新的池。在您的情况下,由于您 * 确实 * 希望循环等待所有任务完成,因此使用
pool.apply_async
代替pool.map
没有任何优势,但是如果您使用pool.apply_async
,则可以通过调用get
而不是关闭并重新启动池来获得与之前相同的结果:循环完成后,
len(population)
保持不变。如果您不希望在所有任务完成之前阻塞每个循环,可以使用
apply_async
的callback
功能:现在,当任何
mesh_subset
返回return_value
时,将调用result.append(return_value)
。对apply_async
的调用不会阻塞,因此N_iteration * N_pop
任务会一次全部推入pool
的任务队列中。但由于池中有6个工作线程,因此在任何给定时间最多只能运行6个对mesh_subset
的调用。当工作线程完成任务时,无论哪个工作线程先调用result.append(return_value)
,result
中的值都是无序的,这与pool.map
不同,pool.map
返回一个列表,其返回值的顺序与其对应的参数列表的顺序相同。除非出现异常,
result
最终将包含N_iteration * N_pop
返回值 * 一旦所有任务完成 *。上面,pool.close()
和pool.join()
用于等待所有任务完成。