现在我有一个工作程序,它正在监视一个目录,寻找一个特定命名的文件。当文件出现在这个目录中时,代码将其名称更改为其他名称,然后在另一个进程中执行一些函数,以便同时处理多个文件(下面的代码可能比我蹩脚的英语更好地解释它)。我需要改变这种方式,这些文件被添加和存储在队列中,我可以限制并发工作的文件数量。
示例性电流代码:
Class WorkOnFiles(multiprocessing.Process):
def __init__(self, filename):
...
def run(self):
#do something on files
directory = "/path/to/directory"
while True:
for filename in os.listdir(directory):
if filename.startswith("ABC"):
new_filename = "XYZ" + filename
os.system(f"mv {filename} {new_filename}")
WorkOnFiles(new_filename).start()
我希望新的代码工作类似(最好不要改变“WorkOnFiles”类),但限制并发进程的数量,而这些文件的工作,新的文件被添加到某种队列等待轮到他们。类似这样的东西(下面是破碎的伪代码,只是为了展示我想象的样子):
workers = create_workers(num_of_workers = 2)
queue = make_queue()
while True:
for filename in os.listdir(directory):
if filename.startswith("ABC"):
new_filename = "XYZ" + filename
os.system(f"mv {filename} {new_filename}")
queue.add(new_filename)
if there is an idle worker and queue not empty:
workers.run(WorkOnFiles(queue.get()).start())
time.sleep(1)
因此,这段代码将创建2个worker--可以同时运行的进程。每一秒它都会检查ABC文件是否出现,它会将其新名称添加到队列中,接下来如果至少有一个空闲工作者并且队列不为空,它会启动一个新进程来处理此文件。我尝试过不同的方法,但我真的不能理解多处理是如何。队列和和多处理包工作。
1条答案
按热度按时间wztqucjr1#
在Python中有几种使用多处理的方法。开发人员可以从父进程派生一个进程,通常是为了派生一个新的工作进程来处理一个临时作业。或者开发人员可以使用“池”结构来管理多个进程。还有其他的,主要是在标准的Python
multiprocessing
模块中。在Python版本3中2,他们在其中引入了
concurrent.futures
和ProcessPoolExecutor
类。根据我的经验,这是管理多个流程的最简单方法,并且似乎非常适合您的任务,即1)限制专用工人的数量,以及2)排队任务。这两个特性都内置在executor类中。下面是一个例子,它将一个简单的计算限制为 * 只有两个派生进程 * 来处理多个任务的传入流:
上面是一个极简的例子,你可以应用到你的代码。尝试在您的机器上对上述程序计时,然后将worker的数量从2更改为4,并比较运行时间。
Python的concurrent.futures模块中有更多的多处理结构,值得熟悉。在我看来,这个模块比以往任何时候都更容易在Python中启动和管理多个进程,以便更改现有代码以获得额外的性能(当然,考虑到硬件资源)。