python 多处理,如何将文本文件的计算记录按照读取顺序输出?

bsxbgnwa  于 2023-03-28  发布在  Python
关注(0)|答案(1)|浏览(90)

我有一个大约300 GiB大小的文本文件,它有一个头,后面跟着数据记录。下面是一个虚拟的input.txt

# header
# etc... (the number of lines in the header can vary) 
record #1
record #2
record #3
record #4
record #5
record #6
record #7
record #8
record #9
...

考虑到输入文件的大小,一次处理一行是很慢的,和CPU限制,所以我决定在代码中添加一些并行性:
下面是一个示例代码(我花了一些时间才把它弄对;现在它按预期工作):

#!/usr/bin/env python
import sys
import multiprocessing

def worker(queue,lock,process_id):

    while True:
        data = queue.get()
        if data == None:
            break

        # whatever processing that takes some time
        for x in range(1000000):
            data.split()

        lock.acquire()
        print( data.rstrip() + " computed by process #" + process_id )
        sys.stdout.flush()
        lock.release()

if __name__ == '__main__':

    queue = multiprocessing.Queue()
    lock = multiprocessing.Lock()
    workers = []
    num_processes = 4

    for process_id in range(num_processes-1):
        p = multiprocessing.Process(target=worker, args=(queue,lock,str(process_id)))
        p.start()
        workers.append(p)

    with open('input.txt') as handler:
        try:
            # read the header
            line = next(handler)
            while line.startswith('#'):
                line = next(handler)

            # send the records to the queue
            while True:
                queue.put(line)
                line = next(handler)

        except StopIteration:
            for p in workers:
                queue.put(None)

        finally:
            for p in workers:
                p.join()

输出:

record #4 computed by process #2
record #3 computed by process #1
record #1 computed by process #0
record #2 computed by process #3
record #5 computed by process #2
record #6 computed by process #1
record #7 computed by process #0
record #8 computed by process #3
record #9 computed by process #2

我的问题是输出中的顺序应该与输入中的顺序相同。如何有效地做到这一点?
**AL:**代码的架构可能看起来很奇怪(我没有找到任何看起来像我的代码的例子),所以如果有其他更标准和更有效的方法来做同样的事情,那么如果你能分享它就太好了。

bkkx9g8r

bkkx9g8r1#

pool.map()看起来完全符合你的要求,而且麻烦少了很多。

def worker(line):
    result = do_whatever_to(line)
    return result
    

def line_iterator():
    '''returns the lines we need to process'''
    with open('input.txt') as handler:
        seen_real_line =  False
        for line in handler:
            if seen_real_line or not line.startswith('#'):
                yield line
                seen_real_line = True
                
def main():
    with multiprocessing.Pool(processes=4) as pool:
        for result in pool.imap(worker, line_iterator()):
            print(result)

如果你不包含processes=4,它将使用你机器上的所有CPU,这可能就是你想要的,它不会为每一行创建一个新进程。
line_iterator()可能会变得更高效,但这不会成为瓶颈。
更新:刚刚意识到line_iterator()的核心可以很容易地用一个简单的itertools.dropwhile语句替换。这正是那个函数的用途!哦,好吧,不是这个答案的重要部分。

相关问题