如何在python脚本中运行pscp cmd window步骤

gjmwrych  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(439)

我使用paramiko模块从python脚本运行hadoopmapreduce和其他ssh命令(代码可以在这里看到)。mapreduce作业完成后,我运行getmerge步骤将输出转换为文本文件。
问题是,我必须打开一个cmd窗口并运行pscp将output.txt文件从hdfs环境下载到我的计算机上。例如:

pscp xxxx@xx.xx.xx.xx:/nfs_home/appers/cnielsen/MROutput_121815_0.txt C:\Users\cnielsen\Desktop\MR_Test

如何将这个pscp步骤合并到脚本中,以便在mapreduce和getmerge任务完成后不必打开cmd窗口来运行它?我希望我的脚本能够运行mr任务,getmerge任务,然后自动将mr输出保存到我的计算机。
这是我的密码。

yvgpqqbh

yvgpqqbh1#

我用以下代码解决了这个问题。诀窍是使用scp模块并导入scpclient。请参阅下面的scp\u下载(ssh)功能。
当mapreduce作业完成时,运行getmerge命令,然后执行scp\u下载步骤。

import paramiko
from scp import SCPClient
import time

# Define connection info

host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxx'
pw = 'xxxxxxxx'
port = 22

# Paths

input_loc = '/nfs_home/appers/extracts/*/*.xml'
output_loc = '/user/lcmsprod/output/cnielsen/'
python_path = "/usr/lib/python_2.7.3/bin/python"
hdfs_home = '/nfs_home/appers/cnielsen/'
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'

# File names

xml_lookup_file = 'product_lookups.xml'
mapper = 'Mapper.py'
reducer = 'Reducer.py'
helper_script = 'Process.py'
product_name = 'test1'
output_ref = 'test65'
target_file = 'test_011416_3.txt'

# ----------------------------------------------------

def createSSHClient(host_ip, port, user, pw):
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host_ip, port, user, pw)
    return client

# ----------------------------------------------------

def buildMRcommand(product_name):
    space = " "
    mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
                        '-files', hdfs_home+xml_lookup_file,
                        '-file', hdfs_home+mapper,
                        '-file', hdfs_home+reducer,
                        '-mapper', "'"+python_path, mapper, product_name+"'",
                        '-file', hdfs_home+helper_script,
                        '-reducer', "'"+python_path, reducer+"'",
                        '-input', input_loc,
                        '-output', output_loc+output_ref]

    MR_command = space.join(mr_command_list)
    print MR_command
    return MR_command

# ----------------------------------------------------

def unbuffered_lines(f):
    line_buf = ""
    while not f.channel.exit_status_ready():
        line_buf += f.read(1)
        if line_buf.endswith('\n'):
            yield line_buf
            line_buf = ""

# ----------------------------------------------------

def stream_output(stdin, stdout, stderr):
    writer = open(output_log, 'w')
    # Using line_buffer function
    for l in unbuffered_lines(stderr):
        e = '[stderr] ' + l
        print '[stderr] ' + l.strip('\n')
        writer.write(e)

    # gives full listing..
    for line in stdout:
        r = '[stdout]' + line
        print '[stdout]' + line.strip('\n')
        writer.write(r)
    writer.close()

# ----------------------------------------------------

def run_MapReduce(ssh):
    stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name))
    stream_output(stdin, stdout, stderr)
    return 1

# ----------------------------------------------------

def run_list_dir(ssh):
    list_dir = "ls "+hdfs_home+" -l"
    stdin, stdout, stderr = ssh.exec_command(list_dir)
    stream_output(stdin, stdout, stderr)

# ----------------------------------------------------

def run_getmerge(ssh):
    getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file
    print getmerge
    stdin, stdout, stderr = ssh.exec_command(getmerge)
    for line in stdout:
        print '[stdout]' + line.strip('\n')
    time.sleep(1.5)
    return 1

# ----------------------------------------------------

def scp_download(ssh):
    scp = SCPClient(ssh.get_transport())
    print "Fetching SCP data.."
    scp.get(hdfs_home+target_file, local_dir)
    print "File download complete."

# ----------------------------------------------------

def main():
    # Get the ssh connection
    global ssh
    ssh = createSSHClient(host_ip, port, user, pw)
    print "Executing command..."

    # Command list
    ##run_list_dir(ssh)
    ##run_getmerge(ssh)
    ##scp_download(ssh)

    # Run MapReduce
    MR_status = 0
    MR_status = run_MapReduce(ssh)

    if MR_status == 1:
        gs = 0
        gs = run_getmerge(ssh)
        if gs == 1:
            scp_download(ssh)

    # Close ssh connection
    ssh.close()

if __name__ == '__main__':
    main()

相关问题