python—为什么我在并行处理时出错

iszxjhcz  于 2021-08-20  发布在  Java
关注(0)|答案(1)|浏览(332)

我要通过考试 keyvalue 属于 dictionary 用于并行处理

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": data_preprocess.dataset_1,
        "Dataset_2": data_preprocess.dataset_2,}
    pool = mp.Pool(8)
    pool.starmap(main, zip(DATASETS.keys(), DATASETS.values()))
    pool.close()

# As I am not joining any result and I am directly saving the output

# in CSV file from  (main function) I did not used pool.join()

这个 main 功能

def main(dataset_name, generate_dataset):
    REGRESSORS = {
        "LinReg": LinearRegression(),
        "Lasso": Lasso(),}
    ROOT = Path(__file__).resolve().parent
    dataset_name = dataset_name
    generate_dataset = generate_dataset
    dfs = []
    for reg_name, regressor in REGRESSORS.items():
        df = function_calling(
            generate_dataset=generate_dataset,
            regressor=regressor,
            reg_name=reg_name,)
        print(df)
        dfs.append(df)
    df = pd.concat(dfs, axis=0, ignore_index=True)
    filename = dataset_name + "_result.csv"
    outfile = str(PATH) + "/" + filename
    df.to_csv(outfile)

我犯了一个错误 AssertionError: daemonic processes are not allowed to have children . 你能告诉我为什么会出错吗?我如何解决这个问题?

tzcvj98z

tzcvj98z1#

创建自己的 Process 示例:

import multiprocessing as mp

def main(dataset_name, generate_dataset):
    print(dataset_name, generate_dataset, flush=True)
    ... # etc.

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": 1,
        "Dataset_2": 2,}
    processes = [mp.Process(target=main, args=(k, v)) for k, v in DATASETS.items()]
    for process in processes:
        process.start()
    # wait for termination:
    for process in processes:
        process.join

印刷品:

Dataset_1 1
Dataset_2 2

问题是假设您有8个cpu内核和 DATASETS 有100个键/值对。您将创建100个进程。假设这些进程是cpu密集型的,您不能期望它们中有超过8个真的在做任何有效率的事情。然而,创建所有这些进程会带来cpu和存储开销。但只要您将创建的进程数量不超过您拥有的cpu内核数量和功能,就可以 main 不需要将值返回到主进程,这应该可以。
还有一种方法可以使用这些实现您自己的多处理池 Process 示例和a Queue 例如,但这有点复杂:

import multiprocessing as mp

def main(dataset_name, generate_dataset):

    print(dataset_name, generate_dataset, flush=True)
    ... # etc.

def worker(queue):
    while True:
        arg = queue.get()
        if arg is None:
            # signal to terminate
            break
        # unpack
        dataset_name, generate_dataset = arg
        main(dataset_name, generate_dataset)

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": 1,
        "Dataset_2": 2,}
    queue = mp.Queue()
    items = list(DATASETS.items())
    for k, v in items:
        # put the arguments on the queue
        queue.put((k, v))
    # number of processors we will be using:
    n_processors = min(mp.cpu_count(), len(items))
    for _ in range(n_processors):
        # special value to tell main there is no nore work: one for each task
        queue.put(None)
    processes = [mp.Process(target=worker, args=(queue,)) for _ in range(n_processors)]
    for process in processes:
        process.start()
    for process in processes:
        process.join

相关问题