python中的并行化和计算与存储分离

oprakyz7  于 2022-12-17  发布在  Python
关注(0)|答案(1)|浏览(119)

我正在尝试并行化中间结果的计算和存储。任务可以描述如下:
给定要计算的一大组任务,取一个任务块,并在可用的CPU/GPU上并行进行某种计算。输出相对较大,因此无法将其放入所有块的内存中。因此,一旦完成一个块计算,就将从进程中收集的结果写入单个结果文件。真实的的存储机制稍微复杂一些,无法轻松地移动到各个作业。我真的需要把它们收集起来,集中存放起来。
存储部分需要相当长的时间,我不知道如何将这两件事分离开来。理想情况下,工作流应该是:计算-〉收集-〉存储/存储时已经开始计算-〉计算/存储等。
这里有一些伪代码,它只具有并行计算的特性,而没有计算/存储分离的特性。我需要实现什么框架概念来使它更好/更快?

import numpy as np
from multiprocessing import Pool
import time

def crunch(n):
    print(f"crunch dummy things for input: {n}")
    results = np.random.random(100)
    time.sleep(np.random.randint(0, 3))
    return results

def store(results_npz, index):
    print(f"storing iteration {index}")
    np.savetxt(f'test_{str(index).zfill(2)}.out', results_npz)

# all tasks
all_tasks = list(range(10))

# iterate over tasks in chunks
for i in range(5):
    print(f"start iteration {i}")
    input_chunk = [all_tasks.pop(0), all_tasks.pop(0)]
    with Pool(2) as mp:
        results = mp.map(crunch, input_chunk)

    print("storing results ...")
    # ideally, this should start and then the result computation can start again
    results_all = np.vstack(results)
    store(results, i)

编辑:重要信息!只能运行一个存储结果的进程。

af7jpaap

af7jpaap1#

要分离计算和存储任务,可以使用队列将结果从计算进程传递到单独的存储进程。以下是如何修改代码以实现此目的的示例:

import numpy as np
from multiprocessing import Process, Queue
import time

def crunch(n, queue):
    print(f"crunch dummy things for input: {n}")
    results = np.random.random(100)
    time.sleep(np.random.randint(0, 3))
    queue.put((n, results))

def store(queue):
    while True:
        result = queue.get()
        if result is None:
            break
        n, results = result
        print(f"storing results for input: {n}")
        np.savetxt(f'test_{str(n).zfill(2)}.out', results)

# all tasks
all_tasks = list(range(10))

# iterate over tasks in chunks
queue = Queue()
storage_process = Process(target=store, args=(queue,))
storage_process.start()

for i in range(5):
    print(f"start iteration {i}")
    input_chunk = [all_tasks.pop(0), all_tasks.pop(0)]
    processes = []
    for n in input_chunk:
        p = Process(target=crunch, args=(n, queue))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

queue.put(None)
storage_process.join()

说明:

We define a function crunch that takes an input n and a queue as arguments, performs some computation and puts the result in the queue.

We define a function store that takes a queue as an argument and waits for results to be put in the queue. When it receives a result, it stores it. When it receives a None value, it breaks out of the loop and exits.

We create a queue and a Process object for the store function, and start the process.

In the main loop, we create a list of input chunks and a list of Process objects for the crunch function.

For each input in the chunk, we create a Process object and start it.
We wait for all the processes in the chunk to complete.

After all the chunks have been processed, we put a None value in the queue to signal the storage process to exit, and wait for the storage process to complete.

这样,计算和存储任务被解耦,并且存储进程可以在计算进程仍在处理下一任务块的同时开始存储结果。

相关问题