合并mapreduce作业的输出文件

6ovsh4lw  于 2021-06-04  发布在  Hadoop
关注(0)|答案(4)|浏览(493)

我用python编写了一个mapper和reducer,并使用hadoop流在amazon的elasticmapreduce(emr)上成功地执行了它。
最终结果文件夹包含三个不同文件part-00000、part-00001和part-00002中的输出。但我需要一个文件的输出。有办法吗?
以下是我的Map程序代码:


# !/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s\t%s' % (word, 1)

这是我的减速机代码


# !/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None
max_count=0

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

if current_word == word:
    current_count += count
else:
    if current_word:
        # write result to STDOUT
            if current_word[0] != '@':
                print '%s\t%d' % (current_word, current_count)
                if count > max_count:
                    max_count = count
    current_count = count
    current_word = word

if current_word == word:
    print '%s\t%s' % (current_word, current_count)

我需要这个输出作为一个单独的文件。

3htmauhk

3htmauhk1#

我最近也遇到了同样的问题,实际上combiner应该完成这个任务,但是我无法实现。我所做的是;
步骤1:mapper1.py减速器1.py
输入:s3://../data/
输出s3://../small\u输出/
第二步:mapper2.py减速器2.py
输入s3://../数据/
输出:s3://../output2/
第三步:mapper3.py减速器3.py
输入:s3://../output2/
输出:s3://../最终输出/
我假设我们需要step1的输出作为step3的单个文件。
在mapper2.py的顶部,有这样的代码;

if not os.path.isfile('/tmp/s3_sync_flag'):
    os.system('touch /tmp/s3_sync_flag')
    [download files to /tmp/output/]
    os.system('cat /tmp/output/part* > /tmp/output/all')

如果是块,则检查是否有多个Map器执行。

mctunoxg

mctunoxg2#

一种非常简单的方法(假设是linux/unix系统):

$ cat part-00000 part-00001 part-00002 > output
m0rkklqb

m0rkklqb3#

对小数据集/处理使用单个reduce,或对作业的输出文件使用getmerge选项。

k5hmc34c

k5hmc34c4#

我对上述问题的解决方案是执行以下hdfs命令:

hadoop fs -getmerge /hdfs/path local_file

其中/hdfs/path是包含作业输出的所有部分(part-*****)的路径。hadoop fs的-getmerge选项将所有作业输出合并到本地文件系统上的单个文件中。

相关问题