python-3.x 有限工作者多处理队列的实现

aelbi1ox  于 2023-05-02  发布在  Python
关注(0)|答案(1)|浏览(184)

现在我有一个工作程序,它正在监视一个目录,寻找一个特定命名的文件。当文件出现在这个目录中时,代码将其名称更改为其他名称,然后在另一个进程中执行一些函数,以便同时处理多个文件(下面的代码可能比我蹩脚的英语更好地解释它)。我需要改变这种方式,这些文件被添加和存储在队列中,我可以限制并发工作的文件数量。
示例性电流代码:

Class WorkOnFiles(multiprocessing.Process):
   def __init__(self, filename):

   ...

   def run(self):
      #do something on files

directory = "/path/to/directory"

while True:
   for filename in os.listdir(directory):
      if filename.startswith("ABC"):
         new_filename = "XYZ" + filename
         os.system(f"mv {filename} {new_filename}")
         WorkOnFiles(new_filename).start()

我希望新的代码工作类似(最好不要改变“WorkOnFiles”类),但限制并发进程的数量,而这些文件的工作,新的文件被添加到某种队列等待轮到他们。类似这样的东西(下面是破碎的伪代码,只是为了展示我想象的样子):

workers = create_workers(num_of_workers = 2)
queue = make_queue()

while True:
   for filename in os.listdir(directory):
      if filename.startswith("ABC"):
         new_filename = "XYZ" + filename
         os.system(f"mv {filename} {new_filename}")
       
        
         queue.add(new_filename)

      if there is an idle worker and queue not empty:
         workers.run(WorkOnFiles(queue.get()).start())
      time.sleep(1)

因此,这段代码将创建2个worker--可以同时运行的进程。每一秒它都会检查ABC文件是否出现,它会将其新名称添加到队列中,接下来如果至少有一个空闲工作者并且队列不为空,它会启动一个新进程来处理此文件。我尝试过不同的方法,但我真的不能理解多处理是如何。队列和和多处理包工作。

wztqucjr

wztqucjr1#

在Python中有几种使用多处理的方法。开发人员可以从父进程派生一个进程,通常是为了派生一个新的工作进程来处理一个临时作业。或者开发人员可以使用“池”结构来管理多个进程。还有其他的,主要是在标准的Python multiprocessing模块中。
在Python版本3中2,他们在其中引入了concurrent.futuresProcessPoolExecutor类。根据我的经验,这是管理多个流程的最简单方法,并且似乎非常适合您的任务,即1)限制专用工人的数量,以及2)排队任务。这两个特性都内置在executor类中。
下面是一个例子,它将一个简单的计算限制为 * 只有两个派生进程 * 来处理多个任务的传入流:

from time import sleep
from concurrent.futures import ProcessPoolExecutor as Executor

def f(a):
    sleep(0.01)
    
executor = Executor(2)
for x in range(100):
    executor.submit(f, x)

executor.shutdown()

上面是一个极简的例子,你可以应用到你的代码。尝试在您的机器上对上述程序计时,然后将worker的数量从2更改为4,并比较运行时间。
Python的concurrent.futures模块中有更多的多处理结构,值得熟悉。在我看来,这个模块比以往任何时候都更容易在Python中启动和管理多个进程,以便更改现有代码以获得额外的性能(当然,考虑到硬件资源)。

相关问题