numpy 二维数值数组mpi4py的集合

kmbjn2e3  于 2022-12-13  发布在  其他
关注(0)|答案(1)|浏览(157)

我正在通过mpi4py学习并行计算。由于我处理的是一个大数据集,为了不出现内存问题,我需要在主进程上预分配内存。这就是为什么我使用ScattervGatherv方法的原因。所提出的代码只有分配内存的范围,而没有进行任何具体操作

import numpy as np
from mpi4py import MPI

comm   = MPI.COMM_WORLD
rank   = comm.Get_rank()
nprocs = comm.Get_size()


if rank == 0:
    
    sendbuf = np.random.rand(4,3)
    r, c    = sendbuf.shape

    ave, res = divmod(c, nprocs)
    count = [ave + 1 if p < res else ave for p in range(nprocs)]
    count = np.array(count)
    
    print("count is ", count)

    # displacement: the starting index of each sub-task
    displ = [sum(count[:p]) for p in range(nprocs)]
    displ = np.array(displ)
else:
    sendbuf = None
    # initialize count on worker processes
    count = np.zeros(nprocs, dtype=np.int)
    displ = None
    
    
# broadcast count
comm.Bcast(count, root=0)

# initialize recvbuf on all processes
recvbuf = np.zeros((4,count[rank]))

comm.Scatterv([sendbuf, count, displ, MPI.DOUBLE], recvbuf, root=0) 

a, b     = recvbuf.shape
sendbuf2 = np.random.rand(a,b)
recvbuf2 = np.zeros((4,sum(count)))

comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)

在主过程中,首先定义了一个随机的二维数组尺寸(sendbuf)(四、3).我想做的是通过将该矩阵划分为列来将其分散到不同的过程然后初始化recvbuf变量以接收sendbuf的信息块。然后我使用Scatterv方法来传递信息。我注意到只有第一行中的数据被正确传递。这并不重要,因为在实际应用程序中,recvbuf变量只用于预分配内存。此时我重新定义了recvbuf变量。然后我尝试将信息发送回主节点,但是代码给出了错误。我真的不明白我在Gatherv部分做错了什么。
我试图保持例子尽可能简单,这样代码就不会做任何具体的事情。我想学习的是如何正确地分散和收集二维numpy数组。

p4rjhz4m

p4rjhz4m1#

你到底犯了什么“错误”?
这是一个带有2D数组的gatherv的工作版本。请记住,数组的大小与2D数组的大小成比例,因为NumPy在内存中有以行为主的数组

from mpi4py import MPI
import numpy as np

comm_world = MPI.COMM_WORLD
my_rank = comm_world.Get_rank()
num_proc = comm_world.Get_size()

# Parameters for this script
rowlength = 2
sizes = 2*np.ones((num_proc), dtype=np.int32)
sizes[-1] = 1

# Construct some data
data = [np.array((), dtype=np.double) for _ in range(num_proc)]
data[my_rank] = np.array(my_rank+np.random.rand(sizes[my_rank], rowlength), np.double)

# Compute sizes and offsets for Allgatherv
sizes_memory = rowlength*sizes
offsets = np.zeros(num_proc)
offsets[1:] = np.cumsum(sizes_memory)[:-1]

if my_rank == 0:
   print(f"Total size {np.sum(sizes)}")
   print(f"Sizes: {sizes}")
   print(f"Sizes: {sizes_memory}")
   print(f"Offsets: {offsets}")

# Prepare buffer for Allgatherv
data_out = np.empty((np.sum(sizes), rowlength), dtype=np.double)
comm_world.Allgatherv(
   data[my_rank],
   recvbuf=[data_out, sizes_memory.tolist(), offsets.tolist(), MPI.DOUBLE])

if (my_rank == 0):
   print(f"Data_out has shape {data_out.shape}")
   print(data_out[:, 0])

Linux操作系统,MPI:'mpirun(开放MPI)4.1.2',MPI4PY:'mpi4py 3.1.4'

相关问题