我有一个大约300 GiB大小的文本文件,它有一个头,后面跟着数据记录。下面是一个虚拟的input.txt
:
# header
# etc... (the number of lines in the header can vary)
record #1
record #2
record #3
record #4
record #5
record #6
record #7
record #8
record #9
...
考虑到输入文件的大小,一次处理一行是很慢的,和CPU限制,所以我决定在代码中添加一些并行性:
下面是一个示例代码(我花了一些时间才把它弄对;现在它按预期工作):
#!/usr/bin/env python
import sys
import multiprocessing
def worker(queue,lock,process_id):
while True:
data = queue.get()
if data == None:
break
# whatever processing that takes some time
for x in range(1000000):
data.split()
lock.acquire()
print( data.rstrip() + " computed by process #" + process_id )
sys.stdout.flush()
lock.release()
if __name__ == '__main__':
queue = multiprocessing.Queue()
lock = multiprocessing.Lock()
workers = []
num_processes = 4
for process_id in range(num_processes-1):
p = multiprocessing.Process(target=worker, args=(queue,lock,str(process_id)))
p.start()
workers.append(p)
with open('input.txt') as handler:
try:
# read the header
line = next(handler)
while line.startswith('#'):
line = next(handler)
# send the records to the queue
while True:
queue.put(line)
line = next(handler)
except StopIteration:
for p in workers:
queue.put(None)
finally:
for p in workers:
p.join()
输出:
record #4 computed by process #2
record #3 computed by process #1
record #1 computed by process #0
record #2 computed by process #3
record #5 computed by process #2
record #6 computed by process #1
record #7 computed by process #0
record #8 computed by process #3
record #9 computed by process #2
我的问题是输出中的顺序应该与输入中的顺序相同。如何有效地做到这一点?
**AL:**代码的架构可能看起来很奇怪(我没有找到任何看起来像我的代码的例子),所以如果有其他更标准和更有效的方法来做同样的事情,那么如果你能分享它就太好了。
1条答案
按热度按时间bkkx9g8r1#
pool.map()
看起来完全符合你的要求,而且麻烦少了很多。如果你不包含
processes=4
,它将使用你机器上的所有CPU,这可能就是你想要的,它不会为每一行创建一个新进程。line_iterator()
可能会变得更高效,但这不会成为瓶颈。更新:刚刚意识到
line_iterator()
的核心可以很容易地用一个简单的itertools.dropwhile
语句替换。这正是那个函数的用途!哦,好吧,不是这个答案的重要部分。