hadoop组合器未写入reducer

r8uurelv  于 2021-05-30  发布在  Hadoop
关注(0)|答案(0)|浏览(272)

当我执行以下操作时,本地的一切都正常:

Cat data | mapper.py | sort | combiner.py | reducer.py  but when I ran this in Hadoop - combiner keeps on running without sending any output to reducer. Finally job gets killed.

显示 "Java.io.IOException: Bad file descriptor" and "WARN org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Broken pipe" 这只发生在我用combiner运行时,而不是用mapper和reducer运行时。
Map器:


# !/usr/bin/python

import sys
import csv
import re

def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')

    for line in reader:

        if line[0] =="id":
            continue
        body = line[4].strip()
        ids =line[0]
        #print body
        body_split =re.split('\W+', body)

        for i in range(len(body_split)):
            p =re.compile('[a-z]+')
            if p.match(body_split[i].lower()):
                print "{0}\t{1}\t{2}".format(body_split[i].lower(), line[0],1)

mapper()

合路器:

enter code here

# !/usr/bin/python

import sys

oldKey =None 
old_doc =None
doc_count =0

for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", old_doc,"\t",str(doc_count)
        doc_list = []
        doc_count =0
        old_doc=None

    oldKey = thisKey

    if old_doc ==None:
        old_doc =thisDoc
        doc_count +=int(1)
        continue
    if old_doc !=None:
        if thisDoc ==old_doc:
            doc_count +=int(1)
            continue
        else:
            print oldKey, "\t", old_doc,"\t",str(doc_count)
            old_doc =thisDoc
            doc_count =0
            doc_count +=int(1)

if oldKey != None:
    print oldKey, "\t", old_doc,"\t",str(doc_count)

减速器:

enter code here

# !/usr/bin/python

import sys

oldKey =None 
doc_list = []
doc_count =0

for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", sorted(doc_list), "\t",doc_count
        doc_list = []
        doc_count =0
        oldKey=None

    oldKey = thisKey
    doc_count +=int(nos)
    if int(thisDoc) not in doc_list:
        doc_list.append(int(thisDoc))
        #print doc_list

if oldKey != None:
    print oldKey, "\t", sorted(doc_list), "\t",doc_count

问题类似于“倒排索引”-最终输出将是<word,[list of docs],count>
任何帮助都是好的。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题