使用python和subprocess,pipe,popen从hdfs读取/写入文件时出错

wnrlj8wa  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(716)

我正在尝试在python脚本中读取(打开)和写入hdfs文件。但是有错误。有人能告诉我这里怎么了吗。
代码(完整):sample.py

  1. # !/usr/bin/python
  2. from subprocess import Popen, PIPE
  3. print "Before Loop"
  4. cat = Popen(["hadoop", "fs", "-cat", "./sample.txt"],
  5. stdout=PIPE)
  6. print "After Loop 1"
  7. put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
  8. stdin=PIPE)
  9. print "After Loop 2"
  10. for line in cat.stdout:
  11. line += "Blah"
  12. print line
  13. print "Inside Loop"
  14. put.stdin.write(line)
  15. cat.stdout.close()
  16. cat.wait()
  17. put.stdin.close()
  18. put.wait()

当我执行时:

  1. hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.5.1.jar -file ./sample.py -mapper './sample.py' -input sample.txt -output fileRead

它执行正常我找不到应该在hdfs modifiedfile中创建的文件
当我执行时:

  1. hadoop fs -getmerge ./fileRead/ file.txt

在file.txt中,我得到:

  1. Before Loop
  2. Before Loop
  3. After Loop 1
  4. After Loop 1
  5. After Loop 2
  6. After Loop 2

有人能告诉我我做错了什么吗??我不认为它是从sample.txt读取的

jv4diomz

jv4diomz1#

有人能告诉我我做错了什么吗??
你的 sample.py 可能不是合适的Map器。Map程序可能接受其在stdin上的输入并将结果写入其stdout,例如。, blah.py :

  1. # !/usr/bin/env python
  2. import sys
  3. for line in sys.stdin: # print("Blah\n".join(sys.stdin) + "Blah\n")
  4. line += "Blah"
  5. print(line)

用法:

  1. $ hadoop ... -file ./blah.py -mapper './blah.py' -input sample.txt -output fileRead
uubf1zoe

uubf1zoe2#

试着改变你的想法 put 子流程采取 cat 通过改变这个

  1. put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
  2. stdin=PIPE)

在这里

  1. put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
  2. stdin=cat.stdout)

完整脚本:

  1. # !/usr/bin/python
  2. from subprocess import Popen, PIPE
  3. print "Before Loop"
  4. cat = Popen(["hadoop", "fs", "-cat", "./sample.txt"],
  5. stdout=PIPE)
  6. print "After Loop 1"
  7. put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
  8. stdin=cat.stdout)
  9. put.communicate()
展开查看全部

相关问题