我手头的当前任务需要多个数组操作,这比可行的时间要长。我正试图利用多处理包来加速这个过程,但我目前在混合使用np.memmap和多处理时遇到了问题。具体来说,我似乎无法写入内存Map或在之后保存它。
目前的计划是在磁盘上创建一个大的numpy memmap,生成数据并将其逐个复制到memmap上,然后保存以供以后使用。问题似乎来自写入或保存部分,因为数据生成部分工作正常。下面是示例代码。
import os
import multiprocessing
import numpy as np
from datetime import date
from tqdm.auto import tqdm
from Micelle_Simulation import Simulated_Micelle
def target(
id_: int,
q_range: np.ndarray,
fp: np.memmap,
shape: str,
r: float,
epsilon: float,
N_agg: int,
rho_beta: float,
b: float,
N: int,
n: int,
f_core: float
) -> None:
micelle = Simulated_Micelle(
shape=shape,
r=r,
epsilon=epsilon,
N_agg=N_agg,
rho_beta=rho_beta,
b=b,
N=N,
n=n,
f_core=f_core
)
n_corona = n*N*N_agg
n_core = int((f_core/(1 - f_core))*n_corona)
params = np.array((
r,
epsilon,
micelle.R_g,
N_agg,
rho_beta,
b,
N,
n,
f_core
))
I_q = micelle.Debye_Scattering(
nodes=micelle.scatterers,
core_node_num=n_core,
q_range=q_range,
rho_beta=rho_beta
)
params_I_q = np.hstack((params, I_q))
fp[id_, :] = params_I_q
print(params_I_q)
def main(*args, **kwargs) -> int:
remainder = 2
p_count = os.cpu_count() - remainder
processes = []
for i in range(p_count):
processes.append(None)
sample_num = 2
shape = 'sphere'
cwd = os.getcwd()
today = str(date.today()).replace("-", "_")
username = os.getlogin()
end = ".npy"
file_name = f"{today}_{username}"
attempt = 0
while True:
temp_name = f"{file_name}_{attempt}{end}"
temp_path = os.path.join(cwd, temp_name)
if not os.path.isfile(temp_path):
path = temp_path
break
else:
attempt += 1
fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
# storage = np.empty((sample_num, 265))
q_log_range = np.arange(-2, 0, np.true_divide(1, 128))
q_range = np.power(10, q_log_range - 2*np.log10(2))
p_num = 0
print(f'Using {p_count} threads.')
for i in tqdm(range(sample_num)):
r = np.power(2, np.random.uniform(6, 10))
epsilon = 1
N_agg = np.random.randint(8, 33)
rho_beta = np.random.normal(0.1, 0.02)
b = np.power(2, np.random.uniform(4, 7))
N = np.random.randint(2, 9)
n = 6
f_core = np.random.normal(0.7, 0.05)
args = (i, q_range, fp, shape, r, epsilon, N_agg, rho_beta, b, N, n, f_core)
while True:
if processes[p_num] is None or not processes[p_num].is_alive():
p = multiprocessing.Process(target=target, args=args)
p.start()
processes[p_num] = p
break
else:
p_num = (p_num + 1)%p_count
print('Loop completed.')
for process in processes:
if process:
process.join()
else:
continue
print("Task completed.")
fp.flush()
print(fp)
return 0
if __name__ == '__main__':
main()
字符串
忽略任何涉及Micelle_Simulation的代码部分。它只创建一个(265,)数组。
问题似乎来自代码的以下部分。
fp[id_, :] = params_I_q
x
fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
while True:
if processes[p_num] is None or not processes[p_num].is_alive():
p = multiprocessing.Process(target=target, args=args)
p.start()
processes[p_num] = p
break
else:
p_num = (p_num + 1)%p_count
的字符串
我的期望是,代码将逐个创建数据,但使用多个进程,将其复制到磁盘上(在memmap上),并将其保存以供以后使用。因此,如果我想创建10000个样本,它将创建(10000,265)memmap,逐个生成(265)数组数据,并将其复制到memmap。
然而,我实际上得到的是一个零数组,就好像memmap根本没有被碰过一样。
测试期间未出现错误。
P.S.我知道我的多处理代码不是最佳的。我没有研究过CS或SWE,这是作为数据生成的简短脚本。任何改进或建议将不胜感激。
1条答案
按热度按时间mec1mxoz1#
已将多处理更改为多线程。似乎对我的目的很有效。
字符串