我正在尝试并行化中间结果的计算和存储。任务可以描述如下:
给定要计算的一大组任务,取一个任务块,并在可用的CPU/GPU上并行进行某种计算。输出相对较大,因此无法将其放入所有块的内存中。因此,一旦完成一个块计算,就将从进程中收集的结果写入单个结果文件。真实的的存储机制稍微复杂一些,无法轻松地移动到各个作业。我真的需要把它们收集起来,集中存放起来。
存储部分需要相当长的时间,我不知道如何将这两件事分离开来。理想情况下,工作流应该是:计算-〉收集-〉存储/存储时已经开始计算-〉计算/存储等。
这里有一些伪代码,它只具有并行计算的特性,而没有计算/存储分离的特性。我需要实现什么框架概念来使它更好/更快?
import numpy as np
from multiprocessing import Pool
import time
def crunch(n):
print(f"crunch dummy things for input: {n}")
results = np.random.random(100)
time.sleep(np.random.randint(0, 3))
return results
def store(results_npz, index):
print(f"storing iteration {index}")
np.savetxt(f'test_{str(index).zfill(2)}.out', results_npz)
# all tasks
all_tasks = list(range(10))
# iterate over tasks in chunks
for i in range(5):
print(f"start iteration {i}")
input_chunk = [all_tasks.pop(0), all_tasks.pop(0)]
with Pool(2) as mp:
results = mp.map(crunch, input_chunk)
print("storing results ...")
# ideally, this should start and then the result computation can start again
results_all = np.vstack(results)
store(results, i)
编辑:重要信息!只能运行一个存储结果的进程。
1条答案
按热度按时间af7jpaap1#
要分离计算和存储任务,可以使用队列将结果从计算进程传递到单独的存储进程。以下是如何修改代码以实现此目的的示例:
说明:
这样,计算和存储任务被解耦,并且存储进程可以在计算进程仍在处理下一任务块的同时开始存储结果。