我开始在代码中包含多处理,因为我试图自动化的任务在计算上是相当昂贵的。根据我收集的stackoverflow信息,我的代码中的模块结构如下所示。我在Win10中使用Python 3.7。
main:调用多处理函数的地方,除了加载输入等。
import pandas as pd
import run
def do():
df=pd.DataFrame({'Identifier': ['id_1', 'id_1', 'id_1', 'id_1', 'id_1', 'id_2', 'id_2', 'id_2', 'id_2', 'id_2', 'id_3', 'id_3', 'id_3', 'id_3', 'id_3'],
'float_id': [1, 2, 3, 4, 5, 10, 25, 33, 45, 50, .1, .2, .3, .4, .5],
'a': np.random.rand(15),
'b': np.random.rand(15),
'c': np.random.rand(15)})
v_column=['a', 'b', 'c']
df_out=run.function_multiprocessing(df, v_column)
return df_out
if __name__=='__main__':
df_out=do()
字符串
*run:多处理函数所在位置
import defs
import pandas as pd
import multiprocessing
def iterator(data, id_col, value_col):
for col in value_col:
yield (data[col].values, data[id_col].values)
def function_multiprocessing(data, v_column):
list_df=[]
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
for identifier, df_f in data.groupby(['Identifier']):
print(identifier)
data_f=pool.starmap(defs.function_to_apply, iterator(df_f, 'float_id', v_column))
out=pd.DataFrame(data_f, index=[identifier])
list_df.append(out)
df_out=pd.concat(list_df)
return df_out
型
*defs:要“多进程”(function_to_apply)的函数所在位置
这些模块都不属于我的PYHTONPATH,并且位于同一个文件夹中。我试图做的是对每个“标识符”应用一个函数,基于“float_id”中的值以及列“a”,“b”和“c”的值。为了简单起见,我们可以考虑基于“float_id”值的列值的加权平均。
当我执行代码时,我得到以下错误-无论我尝试什么-对每个worker一遍又一遍。
Process SpawnPoolWorker-1:
Traceback (most recent call last):
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap
self.run()
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 110, in worker
task = get()
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\queues.py", line 354, in get
return _ForkingPickler.loads(res)
ModuleNotFoundError: No module named 'defs'
型
我尝试在导入多处理模块后包含multiprocessing.set_start_method("fork")
行,但出现了一个错误。"spawn"
也是如此。我还尝试将模块“defs”作为参数包含在function_multiprocessing
方法中,并从main
调用它,但没有成功。当提供“function_to_apply”作为参数时,也会出现相同的错误。
我做错了什么?我怎么能让这个工作吗?
多谢了!
UPDATE:在function_multiprocessing
方法中导入defs.py
模块时
def function_multiprocessing(data, v_column):
import defs
list_df=[]
...
型
它不会引起任何错误。但是,当将模块作为变量提供时,它不起作用。
1条答案
按热度按时间uqdfh47h1#
也犯过类似的错误。一个解决方法是直接在
map
中调用subprocess.run
(类似于starmap
,但在这里我们不能使用它,因为它会解包subprocess.run
的参数,而不是下面的“main_script”),在python文件中使用可执行逻辑-而不是函数:字符串
在哪里
if __name__ == '__main__':
中的代码将在其中执行。您可以将function_to_apply
中的逻辑放在那里。