当我执行以下操作时,本地的一切都正常:
Cat data | mapper.py | sort | combiner.py | reducer.py but when I ran this in Hadoop - combiner keeps on running without sending any output to reducer. Finally job gets killed.
显示 "Java.io.IOException: Bad file descriptor" and "WARN org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Broken pipe"
这只发生在我用combiner运行时,而不是用mapper和reducer运行时。
Map器:
# !/usr/bin/python
import sys
import csv
import re
def mapper():
reader = csv.reader(sys.stdin, delimiter='\t')
for line in reader:
if line[0] =="id":
continue
body = line[4].strip()
ids =line[0]
#print body
body_split =re.split('\W+', body)
for i in range(len(body_split)):
p =re.compile('[a-z]+')
if p.match(body_split[i].lower()):
print "{0}\t{1}\t{2}".format(body_split[i].lower(), line[0],1)
mapper()
合路器:
enter code here
# !/usr/bin/python
import sys
oldKey =None
old_doc =None
doc_count =0
for line in sys.stdin:
data_mapped = line.strip().split("\t")
if len(data_mapped) != 3:
# Something has gone wrong. Skip this line.
continue
thisKey, thisDoc,nos = data_mapped
if oldKey and oldKey != thisKey:
print oldKey, "\t", old_doc,"\t",str(doc_count)
doc_list = []
doc_count =0
old_doc=None
oldKey = thisKey
if old_doc ==None:
old_doc =thisDoc
doc_count +=int(1)
continue
if old_doc !=None:
if thisDoc ==old_doc:
doc_count +=int(1)
continue
else:
print oldKey, "\t", old_doc,"\t",str(doc_count)
old_doc =thisDoc
doc_count =0
doc_count +=int(1)
if oldKey != None:
print oldKey, "\t", old_doc,"\t",str(doc_count)
减速器:
enter code here
# !/usr/bin/python
import sys
oldKey =None
doc_list = []
doc_count =0
for line in sys.stdin:
data_mapped = line.strip().split("\t")
if len(data_mapped) != 3:
# Something has gone wrong. Skip this line.
continue
thisKey, thisDoc,nos = data_mapped
if oldKey and oldKey != thisKey:
print oldKey, "\t", sorted(doc_list), "\t",doc_count
doc_list = []
doc_count =0
oldKey=None
oldKey = thisKey
doc_count +=int(nos)
if int(thisDoc) not in doc_list:
doc_list.append(int(thisDoc))
#print doc_list
if oldKey != None:
print oldKey, "\t", sorted(doc_list), "\t",doc_count
问题类似于“倒排索引”-最终输出将是<word,[list of docs],count>
任何帮助都是好的。
暂无答案!
目前还没有任何答案,快来回答吧!