我正在通过mpi4py学习并行计算。由于我处理的是一个大数据集,为了不出现内存问题,我需要在主进程上预分配内存。这就是为什么我使用Scatterv
和Gatherv
方法的原因。所提出的代码只有分配内存的范围,而没有进行任何具体操作
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if rank == 0:
sendbuf = np.random.rand(4,3)
r, c = sendbuf.shape
ave, res = divmod(c, nprocs)
count = [ave + 1 if p < res else ave for p in range(nprocs)]
count = np.array(count)
print("count is ", count)
# displacement: the starting index of each sub-task
displ = [sum(count[:p]) for p in range(nprocs)]
displ = np.array(displ)
else:
sendbuf = None
# initialize count on worker processes
count = np.zeros(nprocs, dtype=np.int)
displ = None
# broadcast count
comm.Bcast(count, root=0)
# initialize recvbuf on all processes
recvbuf = np.zeros((4,count[rank]))
comm.Scatterv([sendbuf, count, displ, MPI.DOUBLE], recvbuf, root=0)
a, b = recvbuf.shape
sendbuf2 = np.random.rand(a,b)
recvbuf2 = np.zeros((4,sum(count)))
comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)
在主过程中,首先定义了一个随机的二维数组尺寸(sendbuf
)(四、3).我想做的是通过将该矩阵划分为列来将其分散到不同的过程然后初始化recvbuf
变量以接收sendbuf
的信息块。然后我使用Scatterv
方法来传递信息。我注意到只有第一行中的数据被正确传递。这并不重要,因为在实际应用程序中,recvbuf
变量只用于预分配内存。此时我重新定义了recvbuf
变量。然后我尝试将信息发送回主节点,但是代码给出了错误。我真的不明白我在Gatherv
部分做错了什么。
我试图保持例子尽可能简单,这样代码就不会做任何具体的事情。我想学习的是如何正确地分散和收集二维numpy数组。
1条答案
按热度按时间p4rjhz4m1#
你到底犯了什么“错误”?
这是一个带有2D数组的gatherv的工作版本。请记住,数组的大小与2D数组的大小成比例,因为NumPy在内存中有以行为主的数组
Linux操作系统,MPI:'mpirun(开放MPI)4.1.2',MPI4PY:'mpi4py 3.1.4'