在Django或Flask的终端和代码模块中使用python多处理池

oxcyiej7  于 2022-12-14  发布在  Go
关注(0)|答案(3)|浏览(125)

当在python中使用multiprocessing.pool和下面的代码时,会出现一些奇怪的行为。

from multiprocessing import Pool
p = Pool(3)
def f(x): return x
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

我得到以下错误三次(池中的每个线程一次),它输出“3”到“19”:

AttributeError: 'module' object has no attribute 'f'

前三个apply_async调用从不返回。
同时,如果我尝试:

from multiprocessing import Pool
p = Pool(3)
def f(x): print(x)
p.map(f, range(20))

我得到AttributeError 3次,shell打印“6”到“19”,然后挂起,无法通过[Ctrl] + [C]杀死
多重处理文档有以下内容:
此包中的功能要求main模块可由子模块导入。
这是什么意思?
为了澄清,我在终端中运行代码来测试功能,但最终我希望能够将其放到Web服务器的模块中。如何在python终端和代码模块中正确使用multiprocessing.pool?

wwtsj6pe

wwtsj6pe1#

**警告:**在Django和Flask这样的Web服务器环境中使用多处理是错误的工具。相反,你应该使用Celery这样的任务框架或Elastic Beanstalk Worker Environments这样的基础设施解决方案。使用多处理来产生线程或进程是不好的,因为它没有给你对这些线程/进程的监督或管理,所以你必须构建自己的故障检测逻辑,重试逻辑等。在这一点上,你最好使用一个现成的工具,实际上是用来处理异步任务的,因为它会给你这些开箱即用的东西。

了解文档

包中的功能要求主模块可由子模块导入。

这意味着池必须在定义了要在其上运行的函数之后进行初始化。(比如Django或者Flask项目)所以,如果你想在其中一个项目中使用Pools,请确保遵循以下指导原则,以下各节对此进行了说明:
1.尽可能在函数内部初始化池。如果必须在全局范围内初始化池,请在模块底部进行初始化。
1.不要在全局范围内调用Pool的方法。
或者,如果您只需要在I/O(如数据库访问或网络调用)上实现更好的并行性,您可以保存所有这些麻烦,使用线程池而不是进程池。

from multiprocessing.pool import ThreadPool

它的接口与Pool的接口完全相同,但由于它使用线程而不是进程,因此没有使用进程池时需要注意的问题,唯一的缺点是您无法获得代码执行的真正并行性,只能获得阻塞I/O的并行性。

池必须在定义要在其上运行的函数之后进行初始化

python文档中的晦涩文字意味着在定义池时,周围的模块由池中的线程导入,对于python终端,这意味着到目前为止您运行过的所有代码。
因此,任何要在池中使用的函数都必须在池初始化之前定义。无论是模块中的代码还是终端中的代码都是如此。对问题中的代码进行以下修改将能很好地工作:

from multiprocessing import Pool
def f(x): return x  # FIRST
p = Pool(3) # SECOND
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

或者

from multiprocessing import Pool
def f(x): print(x)  # FIRST
p = Pool(3) # SECOND
p.map(f, range(20))

我说的好,是指在Unix上很好。Windows有它自己的问题,我在这里就不谈了。

在模块中使用池

但是等等,还有更多(在您希望导入到其他地方的模块中使用池)!
如果你在一个函数中定义一个池,你不会有任何问题。但是如果你在一个模块中使用一个Pool对象作为全局变量,它必须在页面的 * 底部 * 定义,而不是在页面的顶部。虽然这与大多数优秀的代码风格不符,但它是功能所必需的。在页面顶部声明池的方法是只将它与从其他模块导入的函数一起使用。像这样:

from multiprocessing import Pool
from other_module import f
p = Pool(3)
p.map(f, range(20))

从另一个模块导入预配置的池是非常可怕的,因为导入必须在您希望在其上运行的任何内容之后进行,如下所示:

### module.py ###
from multiprocessing import Pool
POOL = Pool(5)

### module2.py ###
def f(x):
    # Some function
from module import POOL
POOL.map(f, range(10))

第二,如果您在池中运行要导入的模块的全局范围内的任何内容,系统将挂起。也就是说,这 * 不 * 起作用:

### module.py ###
from multiprocessing import Pool
def f(x): return x
p = Pool(1)
print(p.map(f, range(5)))

### module2.py ###
import module

但是,只要没有任何内容导入module 2,这 * 确实 * 起作用:

### module.py ###
from multiprocessing import Pool

def f(x): return x
p = Pool(1)
def run_pool(): print(p.map(f, range(5)))

### module2.py ###
import module
module.run_pool()

现在,这背后的原因更奇怪了,可能是因为问题中的代码每次只吐出一个属性错误,之后看起来就能正确执行代码。另外,池线程(至少有一定的可靠性)在执行后重新加载模块中的代码。

2ic8powd

2ic8powd2#

创建线程池时,必须已经定义了要在线程池上执行的函数。
这应该可行:

from multiprocessing import Pool

def f(x): print(x)

if __name__ == '__main__':
    p = Pool(3)
    p.map(f, range(20))

原因是(至少在基于Unix的系统上,它有fork)当你创建一个池时,worker是通过分叉当前进程来创建的,所以如果目标函数在那时还没有定义,worker就不能调用它。
在Windows上,Windows doesn't have fork稍有不同。在这里,新的工作进程被启动,主模块被导入。这就是为什么在Windows上,用if __name__ == '__main__'保护正在执行的代码是很重要的。否则,每个新的工作进程都会重新执行代码,从而无限地产生新的进程,使程序(或系统)崩溃。

velaa5lx

velaa5lx3#

这个错误还有另一个可能的来源。我在运行示例代码时得到了这个错误。
问题的根源是,尽管已经正确安装了多处理,但我的系统上没有安装C++编译器,这是pip在尝试更新多处理时通知我的。因此,可能值得检查一下编译器是否安装了。

相关问题