python-3.x 使用多处理功能处理巨型线的正确方法

lx0bsm1f  于 2023-01-14  发布在  Python
关注(0)|答案(1)|浏览(143)

我有点陷入两难的“如何处理巨行文件与多处理”。在计划中,我将发送一个requests每行。该文件包含一个巨大的子域列表由1-10百万行,虽然文件大小是在1 GB以下,但我担心我的实现将耗尽我的内存。
我使用Queue()将任务分配给每个进程。

from multiprocessing import Queue

tasker = Queue()
with open(processor, 'r') as f:
    for line in f:
        tasker.put(line.strip())

Multiprocessing最终会变慢,所以一定是因为Queue()的增长速度超过了进程的完成速度。至于Alternative,我使用itertools.islice()函数,它将产生4个任务,然后立即使用它。

from itertools import islice
from multiprocessing import Queue

tasker = Queue()
with open('file.txt', 'r') as f:
    for line in f:
        liner = [line] + list(islice(f, 4))
        for i in liner:
            tasker.put(str(re.sub('\n', '', i.strip())))

最后,Multiprocessing运行在一个合理的时间内,没有减速,但它创建了一个包含4个项目的列表,这些项目将被追加到Queue()中,直到列表为空。所以它有点把项目从一个列表放到另一个列表中,奇怪,我知道。
有没有更好的方法来处理这个问题,而不需要过多的内存?

n3schb8v

n3schb8v1#

这两个代码段在生成的输出中是等效的,第二个只是速度慢一些,并且只是通过“停止CPU”来修复问题。
限制内存使用的正确方法是限制队列本身,队列接受其最大大小作为参数。

from multiprocessing import Queue
max_size = 1000
tasker = Queue(max_size)

如果主进程尝试在其中放置更多任务,则它将阻塞主进程,直到工作者完成一些工作,这在CPU和内存使用方面将是快速和高效的,(主进程将允许工作者使用其CPU核心来完成一些工作,因此完成更多工作,而不消耗更多内存)
如果您不希望主进程因为有其他事情要做而被阻塞,那么您可以在主进程中使用threading模块将任务放入队列中
如果您想减少锁定队列的开销,那么您应该分块提交工作。

from itertools import islice
from multiprocessing import Queue

max_size = 200  # note: 200 * 5 = 1000
tasker = Queue(max_size)
with open('file.txt', 'r') as f:
    for line in f:
        liner = [line] + list(islice(f, 4))
        liner = [x.strip() for x in liner]
        tasker.put(liner)  # put the list of 5 items in the queue

并且期望消费者循环从队列获得的列表并独立地执行接收到的列表中的每个任务。

相关问题