mapreduce和paramiko如何在流媒体时打印stdout

u4dcyp6a  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(464)

我使用paramiko创建了一个小python脚本,它允许我运行mapreduce作业,而不使用putty或cmd窗口来启动作业。这很好用,只是在工作完成之前我不能看到stdout。我如何设置它,使我可以看到每一行的标准输出,因为它是生成的,就像我将能够通过命令窗口?
这是我的剧本:

  1. import paramiko
  2. # Define connection info
  3. host_ip = 'xx.xx.xx.xx'
  4. user = 'xxxxxxxxx'
  5. pw = 'xxxxxxxxx'
  6. # Commands
  7. list_dir = "ls /nfs_home/appers/cnielsen -l"
  8. MR = "hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -files /nfs_home/appers/cnielsen/product_lookups.xml -file /nfs_home/appers/cnielsen/Mapper.py -file /nfs_home/appers/cnielsen/Reducer.py -mapper '/usr/lib/python_2.7.3/bin/python Mapper.py test1' -file /nfs_home/appers/cnielsen/Process.py -reducer '/usr/lib/python_2.7.3/bin/python Reducer.py' -input /nfs_home/appers/extracts/*/*.xml -output /user/loc/output/cnielsen/test51"
  9. getmerge = "hadoop fs -getmerge /user/loc/output/cnielsen/test51 /nfs_home/appers/cnielsen/test_010716_0.txt"
  10. client = paramiko.SSHClient()
  11. client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  12. client.connect(host_ip, username=user, password=pw)
  13. ## stdin, stdout, stderr = client.exec_command(list_dir)
  14. ## stdin, stdout, stderr = client.exec_command(getmerge)
  15. stdin, stdout, stderr = client.exec_command(MR)
  16. print "Executing command..."
  17. for line in stdout:
  18. print '... ' + line.strip('\n')
  19. for l in stderr:
  20. print '... ' + l.strip('\n')
  21. client.close()
9udxz4iz

9udxz4iz1#

这段代码隐式地调用stdout.read(),直到eof。因此,您必须分块读取stdout/stderr以立即获得输出。这个答案,尤其是这个答案的一个修改版本应该可以帮助你解决这个问题。我建议根据您的用例调整答案2,以防止一些常见的延迟场景。
下面是一个改编自答案1的例子

  1. sin,sout,serr = ssh.exec_command("while true; do uptime; done")
  2. def line_buffered(f):
  3. line_buf = ""
  4. while not f.channel.exit_status_ready():
  5. line_buf += f.read(1)
  6. if line_buf.endswith('\n'):
  7. yield line_buf
  8. line_buf = ''
  9. for l in line_buffered(sout): # or serr
  10. print l

相关问题