如何使用python跳过二进制stdin的n行?

rdlzhqv9  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(477)

我使用hadoopcli将二进制数据传输到hadoop集群上的python脚本。二进制数据有终止符,用于标识新文档的开始位置。记录按唯一标识符排序,该标识符从1000000001开始,增量为1。
我试图只保存字典中这些id的一个子集的数据。
我当前的过程是使用以下命令从cli中选择数据:

  1. hadoop select "Database" "Collection" | cut -d$'\t' -f2 | python script.py

然后在script.py中处理它,如下所示:

  1. import json
  2. import sys
  3. member_mapping = json.load(open('member_mapping.json'))
  4. output = []
  5. for line in sys.stdin:
  6. person = json.loads(line)
  7. if member_mapping.get(person['personId']):
  8. output.append({person['personId']: person})
  9. if len(output) == len(member_mapping):
  10. break

问题是这个二进制数据中有6.5m个ID,扫描几乎需要2个小时。我知道字典中的min()和max()id,你可以在我的代码中看到,当我保存了n个文档时,我很早就停止了,其中n是Map文件的长度。
我想通过跳过尽可能多的读取来提高这个过程的效率。如果id从1000000001开始,我想保存的第一个id是100001000001,我可以跳过10000行吗?
由于系统问题,我目前无法使用spark或任何其他可能改进此过程的工具,因此我现在需要坚持使用python和hadoop cli的解决方案。

xqk2d5yq

xqk2d5yq1#

你可以试着用 enumerate 和一个阈值,然后跳过任何你不关心的输入。这不是一个直接的解决方案,但是应该运行得更快,并且很快就将前10000行代码扔掉。

  1. for lineNum, line in enumerate(sys.stdin):
  2. if(lineNum < 10000):
  3. continue
  4. person = json.loads(line)
  5. if member_mapping.get(person['personId']):
  6. output.append({person['personId']: person})
  7. if len(output) == len(member_mapping):
  8. break

相关问题