我正在尝试运行我的第一个mapreduce作业,它从xml文件聚合一些数据。我的工作失败了,因为我是hadoop的新手,如果有人能看看出了什么问题,我将不胜感激。
我有:
posts\u mapper.py:
# !/usr/bin/env python
import sys
import xml.etree.ElementTree as ET
input_string = sys.stdin.read()
class User(object):
def __init__(self, id):
self.id = id
self.post_type_1_count = 0
self.post_type_2_count = 0
self.aggregate_post_score = 0
self.aggregate_post_size = 0
self.tags_count = {}
users = {}
root = ET.fromstring(input_string)
for child in root.getchildren():
user_id = int(child.get("OwnerUserId"))
post_type = int(child.get("PostTypeId"))
score = int(child.get("Score"))
#view_count = int(child.get("ViewCount"))
post_size = len(child.get("Body"))
tags = child.get("Tags")
if user_id not in users:
users[user_id] = User(user_id)
user = users[user_id]
if post_type == 1:
user.post_type_1_count += 1
else:
user.post_type_2_count += 1
user.aggregate_post_score += score
user.aggregate_post_size += post_size
if tags != None:
tags = tags.replace("<", " ").replace(">", " ").split()
for tag in tags:
if tag not in user.tags_count:
user.tags_count[tag] = 0
user.tags_count[tag] += 1
for i in users:
user = users[i]
out = "%d %d %d %d %d " % (user.id, user.post_type_1_count, user.post_type_2_count, user.aggregate_post_score, user.aggregate_post_size)
for tag in user.tags_count:
out += "%s %d " % (tag, user.tags_count[tag])
print out
posts\u reducer.py:
# !/usr/bin/env python
import sys
class User(object):
def __init__(self, id):
self.id = id
self.post_type_1_count = 0
self.post_type_2_count = 0
self.aggregate_post_score = 0
self.aggregate_post_size = 0
self.tags_count = {}
users = {}
for line in sys.stdin:
vals = line.split()
user_id = int(vals[0])
post_type_1 = int(vals[1])
post_type_2 = int(vals[2])
aggregate_post_score = int(vals[3])
aggregate_post_size = int(vals[4])
tags = {}
if len(vals) > 5:
#this means we got tags
for i in range (5, len(vals), 2):
tag = vals[i]
count = int((vals[i+1]))
tags[tag] = count
if user_id not in users:
users[user_id] = User(user_id)
user = users[user_id]
user.post_type_1_count += post_type_1
user.post_type_2_count += post_type_2
user.aggregate_post_score += aggregate_post_score
user.aggregate_post_size += aggregate_post_size
for tag in tags:
if tag not in user.tags_count:
user.tags_count[tag] = 0
user.tags_count[tag] += tags[tag]
for i in users:
user = users[i]
out = "%d %d %d %d %d " % (user.id, user.post_type_1_count, user.post_type_2_count, user.aggregate_post_score, user.aggregate_post_size)
for tag in user.tags_count:
out += "%s %d " % (tag, user.tags_count[tag])
print out
我运行命令:
bin/hadoop jar hadoop-streaming-2.6.0.jar -input /stackexchange/beer/posts -output /stackexchange/beer/results -mapper posts_mapper.py -reducer posts_reducer.py -file ~/mapreduce/posts_mapper.py -file ~/mapreduce/posts_reducer.py
并得到输出:
packagejobjar:[/home/hduser/mapreduce/posts\u mapper.py,/home/hduser/mapreduce/posts\u reducer.py,/tmp/hadoop-unjar6585010774815976682/][]/tmp/streamjob88638738687983603.jar tmpdir=null 15/03/20 10:18:55 info client.rmproxy:连接到主机上的resourcemanager/10.1.1.22:8040 15/03/20 10:18:55 info client.rmproxy:连接到主机上的resourcemanager/10.1.1.22:8040 15/03/20 10:18:57 info mapred.fileinputformat:到的总输入路径流程:10 15/03/20 10:18:57 info mapreduce.jobsubmitter:数量splits:10 15/03/20 10:18:57 info mapreduce.jobsubmitter:提交作业令牌:job\u 1426769192808\u 0004 15/03/20 10:18:58 info impl.yarclientimpl:提交的应用程序应用程序\u 1426769192808\u 0004 15/03/20 10:18:58 info mapreduce.job:跟踪作业的url:http://i-644dd931:8088/proxy/application\u 1426769192808\u 0004/15/03/20 10:18:58 info mapreduce.job:running job:job\u 1426769192808\u 0004 15/03/20 10:19:11 info mapreduce.job:job job\u 1426769192808\u 0004 running in uber mode:false 15/03/20 10:19:11 info mapreduce.job:map 0%reduce 0%15/03/20 10:19:41 info mapreduce.job:task id:尝试\u 1426769192808 \u 0004 \u m \u000006 \u 0,状态:失败15/03/20 10:19:48 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000007 \u 0,状态:失败15/03/20 10:19:50 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000008 \u 0,状态:失败15/03/20 10:19:50 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000009 \u 0,状态:失败15/03/20 10:20:00 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m\u000006 \u 1,状态:失败15/03/20 10:20:08 info mapreduce.job:Map7%减少0%15/03/20 10:20:10 info mapreduce.job:Map20%减少0%15/03/20 10:20:10 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m\u000007 \u 1,状态:失败15/03/20 10:20:11 info mapreduce.job:Map10%减少0%15/03/20 10:20:17 info mapreduce.job:Map20%减少0%15/03/20 10:20:17 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000008 \u 1,状态:失败15/03/20 10:20:19 info mapreduce.job:map 10%reduce 0%15/03/20 10:20:19 info mapreduce.job:task id:attempt\u 1426769192808\u 0004\u m\u000009\u 1,状态:失败15/03/20 10:20:22 info mapreduce.job:map 20%reduce 0%15/03/20 10:20:22 info mapreduce.job:task id:attempt\u 1426769192808\u 0004\u m\u000006\u 2,状态:失败15/03/20 10:20:25 info mapreduce.job:Map40%reduce 0%15/03/20 10:20:25 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000002 \u 0,状态:失败错误:java.lang.runtimeexception:pipemapred.waitoutputthreads():org.apache.hadoop.streaming.pipemapred.waitoutputthreads(pipemapred)处的子进程失败,代码为1。java:322)在org.apache.hadoop.streaming.pipemapred.mapredfinished(pipemapred。java:535)在org.apache.hadoop.streaming.pipemapper.close(pipemapper。java:130)在org.apache.hadoop.mapred.maprunner.run(maprunner。java:61)在org.apache.hadoop.streaming.pipemaprunner.run(pipemaprunner。java:34)在org.apache.hadoop.mapred.maptask.runoldmapper(maptask。java:450)在org.apache.hadoop.mapred.maptask.run(maptask。java:343)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:163)在javax.security.auth.subject.doas(主题)中的java.security.accesscontroller.doprivileged(本机方法)。java:415)在org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation。java:1628)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)
15/03/20 10:20:28 info mapreduce.job:Map50%减少0%15/03/20 10:20:28 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000007 \u 2,状态:失败15/03/20 10:20:42 info mapreduce.job:Map50%减少17%15/03/20 10:20:52 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000008 \u 2,状态:失败15/03/20 10:20:54 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000009 \u 2,状态:failed 15/03/20 10:20:56 info mapreduce.job:Map90%reduce 0%15/03/20 10:20:57 info mapreduce.job:Map100%reduce 100%15/03/20 10:20:58 info mapreduce.job:作业\u 1426769192808 \u 0004失败,状态为failed,原因是:任务失败任务\u 1426769192808 \u 0004 \u m \u000006作业因任务失败而失败。failedmaps:1 failedreduces:0
1条答案
按热度按时间kx1ctssn1#
不幸的是,hadoop没有显示
stderr
对于pythonMap器/还原器,因此此输出不提供任何线索。我向您推荐以下两个throubleshooting步骤:
在本地测试mapper/reducer:
cat {your_input_files} | ./posts_mapper.py | sort | ./posts_reducer.py
如果在步骤1中没有发现任何问题,请创建map reduce作业并检查输出日志:yarn logs -applicationId application_1426769192808_0004
或hdfs dfs -cat /var/log/hadoop-yarn/apps/{user}/logs/