python-3.x 从另一个模块调用时,多处理出错,模块未找到

vql8enpb  于 2023-08-08  发布在  Python
关注(0)|答案(1)|浏览(72)

我开始在代码中包含多处理,因为我试图自动化的任务在计算上是相当昂贵的。根据我收集的stackoverflow信息,我的代码中的模块结构如下所示。我在Win10中使用Python 3.7。

main:调用多处理函数的地方,除了加载输入等。

import pandas as pd
import run

def do():
    df=pd.DataFrame({'Identifier': ['id_1', 'id_1', 'id_1', 'id_1', 'id_1', 'id_2', 'id_2', 'id_2', 'id_2', 'id_2', 'id_3', 'id_3', 'id_3', 'id_3', 'id_3'],
                 'float_id': [1, 2, 3, 4, 5, 10, 25, 33, 45, 50, .1, .2, .3, .4, .5],
                 'a': np.random.rand(15),
                 'b': np.random.rand(15),
                 'c': np.random.rand(15)})
    
    v_column=['a', 'b', 'c']
    
    df_out=run.function_multiprocessing(df, v_column)

    return df_out

if __name__=='__main__':
    df_out=do()

字符串

*run:多处理函数所在位置

import defs
import pandas as pd

import multiprocessing

def iterator(data, id_col, value_col):
    for col in value_col:
        yield (data[col].values, data[id_col].values)

def function_multiprocessing(data, v_column):
    list_df=[]
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        for identifier, df_f in data.groupby(['Identifier']): 
            print(identifier)
            data_f=pool.starmap(defs.function_to_apply, iterator(df_f, 'float_id', v_column))
            
            out=pd.DataFrame(data_f, index=[identifier])
            list_df.append(out)
    
    df_out=pd.concat(list_df)
    
    return df_out

*defs:要“多进程”(function_to_apply)的函数所在位置

这些模块都不属于我的PYHTONPATH,并且位于同一个文件夹中。我试图做的是对每个“标识符”应用一个函数,基于“float_id”中的值以及列“a”,“b”和“c”的值。为了简单起见,我们可以考虑基于“float_id”值的列值的加权平均。
当我执行代码时,我得到以下错误-无论我尝试什么-对每个worker一遍又一遍。

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 110, in worker
    task = get()
  File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\queues.py", line 354, in get
    return _ForkingPickler.loads(res)
ModuleNotFoundError: No module named 'defs'


我尝试在导入多处理模块后包含multiprocessing.set_start_method("fork")行,但出现了一个错误。"spawn"也是如此。我还尝试将模块“defs”作为参数包含在function_multiprocessing方法中,并从main调用它,但没有成功。当提供“function_to_apply”作为参数时,也会出现相同的错误。
我做错了什么?我怎么能让这个工作吗?
多谢了!

UPDATE:在function_multiprocessing方法中导入defs.py模块时

def function_multiprocessing(data, v_column):
    import defs
    
    list_df=[]
    ...


它不会引起任何错误。但是,当将模块作为变量提供时,它不起作用。

uqdfh47h

uqdfh47h1#

也犯过类似的错误。一个解决方法是直接在map中调用subprocess.run(类似于starmap,但在这里我们不能使用它,因为它会解包subprocess.run的参数,而不是下面的“main_script”),在python文件中使用可执行逻辑-而不是函数:

pool.map(subprocess.run, [sys.executable, main_script, *args])

字符串
在哪里

  • sys.executable给你你的python可执行文件
  • main_script是一个字符串,它保存了python文件的完整路径,if __name__ == '__main__':中的代码将在其中执行。您可以将function_to_apply中的逻辑放在那里。
  • args:可选的给定参数,您可以将其解析为该脚本的命令行参数。如果你不需要这个,你可以从上面的列表中省略它。

相关问题