numpy 如何通过多处理将更改应用到np.memmap?

4nkexdtk  于 12个月前  发布在  其他
关注(0)|答案(1)|浏览(59)

我手头的当前任务需要多个数组操作,这比可行的时间要长。我正试图利用多处理包来加速这个过程,但我目前在混合使用np.memmap和多处理时遇到了问题。具体来说,我似乎无法写入内存Map或在之后保存它。
目前的计划是在磁盘上创建一个大的numpy memmap,生成数据并将其逐个复制到memmap上,然后保存以供以后使用。问题似乎来自写入或保存部分,因为数据生成部分工作正常。下面是示例代码。

import os
import multiprocessing

import numpy as np

from datetime import date
from tqdm.auto import tqdm

from Micelle_Simulation import Simulated_Micelle

def target(
    id_: int, 
    q_range: np.ndarray, 
    fp: np.memmap, 
    shape: str, 
    r: float, 
    epsilon: float, 
    N_agg: int, 
    rho_beta: float, 
    b: float, 
    N: int, 
    n: int, 
    f_core: float
) -> None:
        
    micelle = Simulated_Micelle(
        shape=shape, 
        r=r, 
        epsilon=epsilon, 
        N_agg=N_agg, 
        rho_beta=rho_beta, 
        b=b, 
        N=N, 
        n=n, 
        f_core=f_core
    )
    
    n_corona = n*N*N_agg
    n_core = int((f_core/(1 - f_core))*n_corona)
    
    params = np.array((
        r, 
        epsilon, 
        micelle.R_g, 
        N_agg, 
        rho_beta, 
        b, 
        N, 
        n, 
        f_core
    ))
    
    I_q = micelle.Debye_Scattering(
        nodes=micelle.scatterers, 
        core_node_num=n_core, 
        q_range=q_range, 
        rho_beta=rho_beta
    )
    
    params_I_q = np.hstack((params, I_q))
        
    fp[id_, :] = params_I_q
    print(params_I_q)
    

def main(*args, **kwargs) -> int:
    
    remainder = 2
    p_count = os.cpu_count() - remainder
    
    processes = []
    
    for i in range(p_count):
        processes.append(None)
    
    sample_num = 2
    
    shape = 'sphere'
    
    cwd = os.getcwd()

    today = str(date.today()).replace("-", "_")
    username = os.getlogin()
    end = ".npy"

    file_name = f"{today}_{username}"
    attempt = 0
    
    while True:
        
        temp_name = f"{file_name}_{attempt}{end}"
        temp_path = os.path.join(cwd, temp_name)
        
        if not os.path.isfile(temp_path):
            path = temp_path
            
            break
        else:
            attempt += 1
    
    fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
    
    # storage = np.empty((sample_num, 265))
      
    q_log_range = np.arange(-2, 0, np.true_divide(1, 128))
    q_range = np.power(10, q_log_range - 2*np.log10(2))
    
    p_num = 0
    
    print(f'Using {p_count} threads.')
    
    for i in tqdm(range(sample_num)):
        
        r = np.power(2, np.random.uniform(6, 10))
        epsilon = 1
        N_agg = np.random.randint(8, 33)
        rho_beta = np.random.normal(0.1, 0.02)
        b = np.power(2, np.random.uniform(4, 7))
        N = np.random.randint(2, 9)
        n = 6
        f_core = np.random.normal(0.7, 0.05)
        
        args = (i, q_range, fp, shape, r, epsilon, N_agg, rho_beta, b, N, n, f_core)
                        
        while True:
            
            if processes[p_num] is None or not processes[p_num].is_alive():
                
                p = multiprocessing.Process(target=target, args=args)
                p.start()
                processes[p_num] = p
                
                break
            
            else:
                p_num = (p_num + 1)%p_count
    
    print('Loop completed.')
    
    for process in processes:
        
        if process:
            process.join()
        else:
            continue
    
    print("Task completed.")
    
    fp.flush()
    
    print(fp)
                    
    return 0

if __name__ == '__main__':
    main()

字符串
忽略任何涉及Micelle_Simulation的代码部分。它只创建一个(265,)数组。
问题似乎来自代码的以下部分。

fp[id_, :] = params_I_q

x

fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
while True:
            
            if processes[p_num] is None or not processes[p_num].is_alive():
                
                p = multiprocessing.Process(target=target, args=args)
                p.start()
                processes[p_num] = p
                
                break
            
            else:
                p_num = (p_num + 1)%p_count

的字符串
我的期望是,代码将逐个创建数据,但使用多个进程,将其复制到磁盘上(在memmap上),并将其保存以供以后使用。因此,如果我想创建10000个样本,它将创建(10000,265)memmap,逐个生成(265)数组数据,并将其复制到memmap。
然而,我实际上得到的是一个零数组,就好像memmap根本没有被碰过一样。
测试期间未出现错误。
P.S.我知道我的多处理代码不是最佳的。我没有研究过CS或SWE,这是作为数据生成的简短脚本。任何改进或建议将不胜感激。

mec1mxoz

mec1mxoz1#

已将多处理更改为多线程。似乎对我的目的很有效。

import numpy as np

class Simulated_Micelle:
    
    
    def __init__(
        self, 
        shape: str, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        rho_beta: float, 
        b: float, 
        N: int, 
        n: int, 
        f_core: float
    ) -> None:
        
        self.shape = shape
        self.r = r
        self.epsilon = epsilon
        self.N_agg = N_agg
        self.rho_beta = rho_beta
        self.b = b
        self.N = N
        self.n = n
        self.f_core = f_core
        
        self.scatterers = self.Generate_Micelle(
            shape=self.shape, 
            r=self.r, 
            epsilon=self.epsilon, 
            N_agg=self.N_agg, 
            rho_beta=self.rho_beta, 
            b=self.b, 
            N=self.N, 
            n=self.n, 
            f_core=self.f_core
        )

    def Spherical_Core(self, r: float, n: int) -> np.ndarray:
        
        # (n) array
        rho = np.cbrt((r**3)*np.random.rand(n))
        phi = np.pi*np.random.rand(n)
        theta = 2*np.pi*np.random.rand(n)
        
        # (n) array
        X = rho*np.sin(phi)*np.cos(theta)
        Y = rho*np.sin(phi)*np.sin(theta)
        Z = rho*np.cos(phi)
        
        # (n, 3) array
        return np.column_stack((X, Y, Z))
    
    
    def Spheroidal_Core(self, r: float, epsilon: float, n: int) -> np.ndarray:
        
        # (n, 3) array
        R = self.Spherical_Core(r=r, n=n)
        # (1, 3) array
        elongation = np.array((1, 1, epsilon))[np.newaxis, :]
        
        return R*elongation
    
    
    def Cylindrical_Core(self, r: float, epsilon: float, n: int) -> np.ndarray:
        
        # (n) array
        rho = np.sqrt((r**2)*np.random.rand(n))
        theta = 2*np.pi*np.random.rand(n)
        Z = r*epsilon*np.random.rand(n)
        
        # (n) array
        X = rho*np.cos(theta)
        Y = rho*np.sin(theta)
        
        # (n, 3) array
        return np.column_stack((X, Y, Z))
    
    
    def Vesicle_Core(self, r: float, epsilon: float, n: int) -> np.ndarray:
        
        t = r*epsilon
        
        # (n) array
        rho = np.cbrt(r**3 - (r - t)**3)*np.random.rand(n) + (r - t)
        phi = np.pi*np.random.rand(n)
        theta = 2*np.pi*np.random.rand(n)
        
        # (n) array
        X = rho*np.sin(phi)*np.cos(theta)
        Y = rho*np.sin(phi)*np.sin(theta)
        Z = rho*np.cos(phi)
        
        # (n, 3) array
        return np.column_stack((X, Y, Z))
    
    
    def Spherical_Roots(self, r: float, N_agg: int, b: float) -> np.ndarray:
        
        phi = np.pi*(np.sqrt(5.0) - 1.0)
        
        # (N_agg) array
        y = np.linspace(1, -1, N_agg)
        # (N_agg) array
        radius = np.sqrt(1 - np.power(y, 2))
        # (N_agg) array
        theta = phi*np.arange(N_agg)
    
        # (N_agg) array
        x = np.cos(theta)*radius
        z = np.sin(theta)*radius
    
        # (N_agg, 3) array
        R_temp = np.column_stack((x, y, z))
    
        # (3) array
        angles = np.random.rand(3)*(2*np.pi)
        sins = np.sin(angles)
        coss = np.cos(angles)
    
        sin_a, sin_b, sin_c = sins
        cos_a, cos_b, cos_c = coss
    
        # (3, 3) array
        rot_mat = [[cos_a*cos_b, cos_a*sin_b*sin_c - sin_a*cos_c, cos_a*sin_b*cos_c + sin_a*sin_c], 
                   [sin_a*cos_b, sin_a*sin_b*sin_c + cos_a*cos_c, sin_a*sin_b*cos_c - cos_a*sin_c], 
                   [-sin_b, cos_b*sin_c, cos_b*cos_c]]
        rot_mat = np.array(rot_mat)
    
        # (N_agg, 3) array
        node_0 = r*np.dot(R_temp, rot_mat.T)
        node_1 = (r + b)*np.dot(R_temp, rot_mat.T)
        
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Spheroidal_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        # (2, N_agg, 3) array
        roots = self.Spherical_Roots(r=r, N_agg=N_agg, b=b)
        # (1, 1, 3) array
        elongation = np.array((1, 1, epsilon))[np.newaxis, np.newaxis, :]
        
        # (2, N_agg, 3) array
        return roots*elongation
    
    
    def Cylindrical_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        # (N_agg) array
        Z = np.linspace(0, r*epsilon, N_agg)
        theta = 2*np.pi*np.random.rand(N_agg)
        
        # (N_agg) array
        X = np.cos(theta)
        Y = np.sin(theta)
        
        # (N_agg, 3) array
        R = np.column_stack((X, Y, Z))
        
        # (N_agg, 3) array
        node_0 = R*np.array((r, r, 1))[np.newaxis, :]
        node_1 = R*np.array((r + b, r + b, 1))[np.newaxis, :]
        
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Disk_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        # (N_agg) array
        rho = np.sqrt((r**2)*np.random.rand(N_agg))
        theta = 2*np.pi*np.random.rand(N_agg)
        Z = np.random.choice([-1, 1], size=N_agg)
        
        # (N_agg) array
        X = rho*np.cos(theta)
        Y = rho*np.sin(theta)
        Z_0 = Z*r*epsilon/2
        Z_1 = Z*(r*epsilon/2 + b)
        
        # (N_agg, 3) array
        node_0 = np.column_stack((X, Y, Z_0))
        node_1 = np.column_stack((X, Y, Z_1))
        
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Vesicle_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        t = r*epsilon
        w_in = (r - t)**2
        w_out = r**2
        
        p = np.array((w_in, w_out))/(w_in + w_out)
        
        # (N_agg) array
        R = np.random.choice([-1, 1], size=N_agg, p=p)
        phi = np.pi*np.random.rand(N_agg)
        theta = 2*np.pi*np.random.rand(N_agg)
            
        # (N_agg) array
        R_0 = r - t/2 + t*R/2
        R_1 = r - t/2 + (t/2 + b)*R
        
        # (N_agg) array
        X_0 = R_0*np.sin(phi)*np.cos(theta)
        Y_0 = R_0*np.sin(phi)*np.sin(theta)
        Z_0 = R_0*np.cos(phi)
        
        # (N_agg) array
        X_1 = R_1*np.sin(phi)*np.cos(theta)
        Y_1 = R_1*np.sin(phi)*np.sin(theta)
        Z_1 = R_1*np.cos(phi)
        
        # (N_agg, 3) array
        node_0 = np.column_stack((X_0, Y_0, Z_0))
        node_1 = np.column_stack((X_1, Y_1, Z_1))
    
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Generate_Nodes(
        self, 
        roots: np.ndarray, 
        core: np.ndarray, 
        r: float, 
        epsilon: float, 
        N_agg: float, 
        b: float, 
        N: int
    ) -> np.ndarray:
        
        core_num = core.shape[0]
        
        sample_num = 16
        
        # (N + 1, N_agg, 3) array
        nodes = np.empty((core_num + (N + 1)*N_agg + 1, 3))
        nodes[:core_num, :] = core
        nodes[core_num:core_num + 2*N_agg, :] = roots
        
        for i in range(1, N):
                    
            # (core_num + (i + 1)*N_agg, 3, 1, 1) array
            previous_nodes = nodes[:core_num + (i + 1)*N_agg, :]
            previous_nodes = previous_nodes[:, :, np.newaxis, np.newaxis]
            
            # (N_agg, 3) array
            start = nodes[core_num + i*N_agg:core_num + (i + 1)*N_agg, :]
                    
            # (N_agg, sample_num) array
            phi = np.pi*np.random.rand(N_agg, sample_num)
            theta = 2*np.pi*np.random.rand(N_agg, sample_num)
            
            # (N_agg, sample_num) array
            Del_Xs = b*np.sin(phi)*np.cos(theta)
            Del_Ys = b*np.sin(phi)*np.sin(theta)
            Del_Zs = b*np.cos(phi)
            
            # (3, N_agg, sample_num) array
            Del_Rs = np.stack((Del_Xs, Del_Ys, Del_Zs), axis=0)
            
            # (3, N_agg, sample_num) array
            temp_locs = np.transpose(start)[:, :, np.newaxis] + Del_Rs
            # (1, 3, N_agg, sample_num) array
            temp_locs = temp_locs[np.newaxis, :]
            
            # (core_num + (i + 1)*N_agg, 3, N_agg, sample_num) array
            Xi_vecs = previous_nodes - temp_locs
            
            # (core_num + (i + 1)*N_agg, N_agg, sample_num) array
            Xi_scas = np.sum(np.power(Xi_vecs, 2), axis=1)
            
            # (N_agg, sample_num) array
            energies = np.sum(1/np.sqrt(Xi_scas), axis=0)
            
            # (1, N_agg, sample_num) array
            weights = np.exp(-energies)[np.newaxis, :]
            
            # (3, N_agg, sample_num) array
            temp_hats = Del_Rs/b
            
            # (3, N_agg) array
            directions = np.sum(temp_hats*weights, axis=2)/np.sum(weights, axis=2)
            
            # (1, N_agg) array
            norms = np.sqrt(np.sum(np.power(directions, 2), axis=0))[np.newaxis, :]
            
            # (N_agg, 3) array
            Deltas = np.transpose(b*directions/norms)
            
            # (N_agg, 3) array
            nodes[core_num + (i + 1)*N_agg:core_num + (i + 2)*N_agg, :] = start + Deltas
        
        # (N + 1, N_agg, 3) array
        return nodes[core_num:-1, :].reshape(N + 1, N_agg, 3)
    
    
    def Generate_Scatterers(self, nodes: np.ndarray, n: int, b: float) -> None:
        
        # nodes: (N + 1, N_agg, 3) array
        node_shape = nodes.shape
        
        # (N, 1, N_agg, 3) array
        start = nodes[:-1, :, :][:, np.newaxis, :]
        
        N = node_shape[0] - 1
        N_agg = node_shape[1]
    
        R_g = np.sqrt(np.power(b, 2)/12)
        
        # (N, N_agg, 3) array
        z_hat = (nodes[1:, :, :] - nodes[:-1, :, :])/b
        # (N, N_agg, 3, 1) array
        z_hat = z_hat[:, :, :, np.newaxis]
        
        z_step = b/n
        
        # (n) array
        z_range = np.arange(n)
        # (N_agg, n) array
        z_range = np.tile(z_range, (N_agg, 1))
        # (N, N_agg, 1, n) array
        z_range = np.tile(z_range, (N, 1, 1))[:, :, np.newaxis, :]
        
        # (N, N_agg, 1, n) array
        zeta = z_step*(1 - np.abs(1 - z_range/(n/2)))
        # (N, N_agg, 1, n) array
        z = (z_range + 1)*z_step + np.random.randn(N, N_agg, 1, n)*zeta/2
        
        # (N, N_agg, 1, n) array
        rho = R_g*(1 - np.abs(1 - z/(b/2)))
        # (N, N_agg, 1, n) array
        r = rho*np.abs(np.random.randn(N, N_agg, 1, n))
    
        # (N, N_agg, 3, n) array
        R = np.random.normal(size=(N, N_agg, 3, n))
        # (N, N_agg, 3, n) array
        R = R - np.sum(R*z_hat, axis=2)[:, :, np.newaxis, :]*z_hat
        # (N, N_agg, 3, n) array
        R_hat = R/np.sqrt(np.sum(np.power(R, 2), axis=2))[:, :, np.newaxis, :]
        
        # (N, N_agg, 3, n) array
        delta = z*z_hat + r*R_hat
        # (N, n, N_agg, 3) array
        delta = delta.transpose(0, 3, 1, 2)
        
        # (N, n, N_agg, 3) array
        scatterers = start + delta
        # (N*n, N_agg, 3) array
        scatterers = scatterers.reshape(N*n, N_agg, 3)
        # (N*n + 1, N_agg, 3) array
        scatterers = np.append(nodes[0, :, :][np.newaxis, :], scatterers, axis=0)
        # ((N*n + 1)*N_agg, 3) array
        scatterers = scatterers.reshape((N*n + 1)*N_agg, 3)
        
        return scatterers
    
    
    def Calculate_R_g(self, nodes: np.ndarray, N: int, N_agg: int) -> float:
        
        # nodes: (N + 1, N_agg, 3) array

        # (N_agg, 3) array
        R_cm_i = np.sum(nodes, axis=0)/(N + 1)
        # (1, N_agg, 3) array
        R_cm_i = R_cm_i[np.newaxis, :]
        
        R_g_sq = np.sum(np.power(R_cm_i - nodes, 2))/((N + 1)*N_agg)
        
        return np.sqrt(R_g_sq)
        
    
    def Debye_Scattering(
        self, 
        nodes: np.ndarray, 
        core_node_num: int, 
        q_range: np.ndarray, 
        rho_beta: float
    ) -> np.ndarray:
        
        # nodes: (scatterer_num, 3) array
        scatterer_num = nodes.shape[0]
        temp_diag = np.diag(np.ones(scatterer_num))
        
        # (scatterer_num) array
        rhos = np.append(np.ones(core_node_num), rho_beta*np.ones(scatterer_num - core_node_num))
        # (scatterer_num, scatterer_num) array
        rhos = rhos[:, np.newaxis]*rhos[np.newaxis, :]
        
        # (scatterer_num, scatterer_num, 3) array
        r_ij = nodes[:, np.newaxis, :] - nodes[np.newaxis, :]
        # (scatterer_num, scatterer_num) array
        r_ij = np.sqrt(np.sum(np.power(r_ij, 2), axis=2)) + temp_diag
        
        # (scatterer_num, scatterer_num) array
        non_kro = 1 - temp_diag
        
        f_q = np.empty(q_range.shape)
        
        for i, q in enumerate(q_range):
            
            qr = q*r_ij
            
            f = non_kro*rhos*np.sin(qr)/qr
            f_q[i] = np.sum(f)
        
        f_q /= np.max(f_q)
        f_q[f_q <= 0] = np.min(np.abs(f_q))
        
        return f_q
    
    
    def Generate_Micelle(
        self, 
        shape: str, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        rho_beta: float, 
        b: float, 
        N: int, 
        n: int, 
        f_core: float
    ) -> np.ndarray:
        
        n_corona = n*N*N_agg
        n_core = int((f_core/(1 - f_core))*n_corona)
        
        match shape:
            
            case 'sphere':
                
                core_scatterers = self.Spherical_Core(r=r, n=n_core)
                corona_roots = self.Spherical_Roots(r=r, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
                            
            case 'spheroid':
                
                core_scatterers = self.Spheroidal_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Spheroidal_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
                            
            case 'cylinder':
                
                core_scatterers = self.Cylindrical_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Cylindrical_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
            
            case 'disk':
                
                core_scatterers = self.Cylindrical_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Disk_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
                
            case 'vesicle':
                
                core_scatterers = self.Vesicle_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Vesicle_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
        
            case _:
                core_scatterers = self.Spherical_Core(r=r, n=n_core)
                corona_roots = self.Spherical_Roots(r=r, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
        
        self.R_g = self.Calculate_R_g(nodes=corona_nodes, N=N, N_agg=N_agg)
        
        # (n_core + n_corona + n*N, 3) array
        scatterers = np.vstack((core_scatterers, corona_scatterers))
    
        return scatterers

def main(*args, **kwargs) -> int:
    
    return 0

if __name__ == '__main__':
    main()

字符串

相关问题