hadoop流作业失败

ffscu2ro  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(476)

我正在尝试运行我的第一个mapreduce作业,它从xml文件聚合一些数据。我的工作失败了,因为我是hadoop的新手,如果有人能看看出了什么问题,我将不胜感激。
我有:
posts\u mapper.py:


# !/usr/bin/env python

import sys
import xml.etree.ElementTree as ET

input_string = sys.stdin.read()

class User(object):

        def __init__(self, id):
                self.id = id
                self.post_type_1_count = 0
                self.post_type_2_count = 0
                self.aggregate_post_score = 0
                self.aggregate_post_size = 0
                self.tags_count = {}

users = {}
root = ET.fromstring(input_string)
for child in root.getchildren():
        user_id = int(child.get("OwnerUserId"))
        post_type = int(child.get("PostTypeId"))
        score = int(child.get("Score"))
        #view_count = int(child.get("ViewCount"))
        post_size = len(child.get("Body"))
        tags = child.get("Tags")

        if user_id not in users:
                users[user_id] =  User(user_id)
        user = users[user_id]
        if post_type == 1:
                user.post_type_1_count += 1
        else:
                user.post_type_2_count += 1
        user.aggregate_post_score += score
        user.aggregate_post_size += post_size

        if tags != None:
                tags = tags.replace("<", " ").replace(">", " ").split()
                for tag in tags:
                        if tag not in user.tags_count:
                                user.tags_count[tag] = 0
                        user.tags_count[tag] += 1

for i in users:
        user = users[i]
        out = "%d %d %d %d %d " % (user.id, user.post_type_1_count, user.post_type_2_count, user.aggregate_post_score, user.aggregate_post_size)
        for tag in user.tags_count:
                out += "%s %d " % (tag, user.tags_count[tag])
        print out

posts\u reducer.py:


# !/usr/bin/env python

import sys

class User(object):

        def __init__(self, id):
                self.id = id
                self.post_type_1_count = 0
                self.post_type_2_count = 0
                self.aggregate_post_score = 0
                self.aggregate_post_size = 0
                self.tags_count = {}

users = {}
for line in sys.stdin:

        vals = line.split()
        user_id = int(vals[0])
        post_type_1 = int(vals[1])
        post_type_2 = int(vals[2])
        aggregate_post_score = int(vals[3])
        aggregate_post_size = int(vals[4])
        tags = {}
        if len(vals) > 5:
                #this means we got tags
                for i in range (5, len(vals), 2):
                        tag = vals[i]
                        count = int((vals[i+1]))
                        tags[tag] = count

        if user_id not in users:
                users[user_id] = User(user_id)
        user = users[user_id]
        user.post_type_1_count += post_type_1
        user.post_type_2_count += post_type_2
        user.aggregate_post_score += aggregate_post_score
        user.aggregate_post_size += aggregate_post_size
        for tag in tags:
                if tag not in user.tags_count:
                        user.tags_count[tag] = 0
                user.tags_count[tag] += tags[tag]

for i in users:
        user = users[i]
        out = "%d %d %d %d %d " % (user.id, user.post_type_1_count, user.post_type_2_count, user.aggregate_post_score, user.aggregate_post_size)
        for tag in user.tags_count:
                out += "%s %d " % (tag, user.tags_count[tag])
        print out

我运行命令:

bin/hadoop jar hadoop-streaming-2.6.0.jar -input /stackexchange/beer/posts -output /stackexchange/beer/results -mapper posts_mapper.py -reducer posts_reducer.py -file ~/mapreduce/posts_mapper.py -file ~/mapreduce/posts_reducer.py

并得到输出:
packagejobjar:[/home/hduser/mapreduce/posts\u mapper.py,/home/hduser/mapreduce/posts\u reducer.py,/tmp/hadoop-unjar6585010774815976682/][]/tmp/streamjob88638738687983603.jar tmpdir=null 15/03/20 10:18:55 info client.rmproxy:连接到主机上的resourcemanager/10.1.1.22:8040 15/03/20 10:18:55 info client.rmproxy:连接到主机上的resourcemanager/10.1.1.22:8040 15/03/20 10:18:57 info mapred.fileinputformat:到的总输入路径流程:10 15/03/20 10:18:57 info mapreduce.jobsubmitter:数量splits:10 15/03/20 10:18:57 info mapreduce.jobsubmitter:提交作业令牌:job\u 1426769192808\u 0004 15/03/20 10:18:58 info impl.yarclientimpl:提交的应用程序应用程序\u 1426769192808\u 0004 15/03/20 10:18:58 info mapreduce.job:跟踪作业的url:http://i-644dd931:8088/proxy/application\u 1426769192808\u 0004/15/03/20 10:18:58 info mapreduce.job:running job:job\u 1426769192808\u 0004 15/03/20 10:19:11 info mapreduce.job:job job\u 1426769192808\u 0004 running in uber mode:false 15/03/20 10:19:11 info mapreduce.job:map 0%reduce 0%15/03/20 10:19:41 info mapreduce.job:task id:尝试\u 1426769192808 \u 0004 \u m \u000006 \u 0,状态:失败15/03/20 10:19:48 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000007 \u 0,状态:失败15/03/20 10:19:50 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000008 \u 0,状态:失败15/03/20 10:19:50 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000009 \u 0,状态:失败15/03/20 10:20:00 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m\u000006 \u 1,状态:失败15/03/20 10:20:08 info mapreduce.job:Map7%减少0%15/03/20 10:20:10 info mapreduce.job:Map20%减少0%15/03/20 10:20:10 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m\u000007 \u 1,状态:失败15/03/20 10:20:11 info mapreduce.job:Map10%减少0%15/03/20 10:20:17 info mapreduce.job:Map20%减少0%15/03/20 10:20:17 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000008 \u 1,状态:失败15/03/20 10:20:19 info mapreduce.job:map 10%reduce 0%15/03/20 10:20:19 info mapreduce.job:task id:attempt\u 1426769192808\u 0004\u m\u000009\u 1,状态:失败15/03/20 10:20:22 info mapreduce.job:map 20%reduce 0%15/03/20 10:20:22 info mapreduce.job:task id:attempt\u 1426769192808\u 0004\u m\u000006\u 2,状态:失败15/03/20 10:20:25 info mapreduce.job:Map40%reduce 0%15/03/20 10:20:25 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000002 \u 0,状态:失败错误:java.lang.runtimeexception:pipemapred.waitoutputthreads():org.apache.hadoop.streaming.pipemapred.waitoutputthreads(pipemapred)处的子进程失败,代码为1。java:322)在org.apache.hadoop.streaming.pipemapred.mapredfinished(pipemapred。java:535)在org.apache.hadoop.streaming.pipemapper.close(pipemapper。java:130)在org.apache.hadoop.mapred.maprunner.run(maprunner。java:61)在org.apache.hadoop.streaming.pipemaprunner.run(pipemaprunner。java:34)在org.apache.hadoop.mapred.maptask.runoldmapper(maptask。java:450)在org.apache.hadoop.mapred.maptask.run(maptask。java:343)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:163)在javax.security.auth.subject.doas(主题)中的java.security.accesscontroller.doprivileged(本机方法)。java:415)在org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation。java:1628)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)
15/03/20 10:20:28 info mapreduce.job:Map50%减少0%15/03/20 10:20:28 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000007 \u 2,状态:失败15/03/20 10:20:42 info mapreduce.job:Map50%减少17%15/03/20 10:20:52 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000008 \u 2,状态:失败15/03/20 10:20:54 info mapreduce.job:任务id:尝试\u 1426769192808 \u 0004 \u m \u000009 \u 2,状态:failed 15/03/20 10:20:56 info mapreduce.job:Map90%reduce 0%15/03/20 10:20:57 info mapreduce.job:Map100%reduce 100%15/03/20 10:20:58 info mapreduce.job:作业\u 1426769192808 \u 0004失败,状态为failed,原因是:任务失败任务\u 1426769192808 \u 0004 \u m \u000006作业因任务失败而失败。failedmaps:1 failedreduces:0

kx1ctssn

kx1ctssn1#

不幸的是,hadoop没有显示 stderr 对于pythonMap器/还原器,因此此输出不提供任何线索。
我向您推荐以下两个throubleshooting步骤:
在本地测试mapper/reducer: cat {your_input_files} | ./posts_mapper.py | sort | ./posts_reducer.py 如果在步骤1中没有发现任何问题,请创建map reduce作业并检查输出日志: yarn logs -applicationId application_1426769192808_0004hdfs dfs -cat /var/log/hadoop-yarn/apps/{user}/logs/

相关问题