pipe命令起作用,但mapreduce不起作用

0sgqnhkj  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(460)

我必须加入6组数据,这些数据与不同频道上某些电视节目的观看量有关。6组数据中的3组包含一个节目列表和每个节目的观看量,例如:
显示\u名称201
另一个节目105
等等。。。
其他3组数据包含每个节目的节目和频道,例如:
显示名称
另一个节目是cnn
等等。。。
我用python编写了以下Map器,以便在频道上查找:


# !/usr/bin/env python

import sys

all_shows_views = []
shows_on_ABC = []

for line in sys.stdin:
    line       = line.strip()   #strip out carriage return (i.e. removes line breaks).
    key_value  = line.split(",")   #split line into key and value, returns a list.
    key_in     = key_value[0]     #.split(" ") - Dont need the split(" ") b/c there is no date. 
    value_in   = key_value[1]     #value is 2nd item. 

    if value_in.isdigit():
        show = key_in
    all_shows_views.append(show + "\t" + value_in)
    if value_in == "ABC":            #check if the TV Show is ABC.       
    show = key_in
           shows_on_ABC.append(show)

for i in range(len(all_shows_views)):
    show_view = all_shows_views[i].split("\t")
    for c in range(len(shows_on_ABC)):
        if show_view[0] == shows_on_ABC[c]:
            print (show_view[0] + "\t" + show_view[1])

# Note that Hadoop expects a tab to separate key value

# but this program assumes the input file has a ',' separating key value.

Map器只传递上的节目名称和视图数量,例如:
在 120上显示\u name\u
在python中,reducer如下所示:

prev_show          = "  "    #initialize previous word  to blank string
line_cnt           = 0      #count input lines.
count            = 0        #keep running total.

for line in sys.stdin:
    line       = line.strip()           #strip out carriage return
    key_value  = line.split('\t')       #split line, into key and value, returns a list
    line_cnt   = line_cnt+1   
    curr_show  = key_value[0]             #key is first item in list, indexed by 0
    value_in   = key_value[1]             #value is 2nd item

    if curr_show != prev_show and line_cnt>1:
    #print "\n"
    #print "---------------------Total---------------------"
    #print "\n"
    print (prev_show + "\t" + str(count))
    #print "\n"
    #print "------------------End of Item------------------"
    #print "\n"
    count = 0
    else:
    count = count + int(key_value[1])
        #print key_value[0] + "\t" + key_value[1]

    prev_show = curr_show  #set up previous show for the next set of input lines.

print (curr_show + "\t" + str(count))

reducer获取上的显示列表和视图数量,并保持每个显示的计数平均值,并打印出每个显示的总数(hadoop会根据键自动按字母顺序排列数据,在本例中,键是显示的名称)。
在终端中使用piping命令运行此命令时,如下所示:

cat Data*.text | /home/cloudera/mapper.py |sort| /home/coudera/reducer.py

我得到一个整洁的输出,正确的总数如下:
差不多了吧49237
几乎所有的新闻45589
差不多了49186
小游戏50603
当我在终端中使用hadoop命令运行此问题时,使用以下命令:

> hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
   -input /user/cloudera/input \
   -output /user/cloudera/output_join \   
   -mapper /home/cloudera/mapper.py \   
   -reducer /home/cloudera/reducer.py

我得到了一个失败的错误,减速机是罪魁祸首。完全错误如下:

15/11/15 09:16:54 INFO mapreduce.Job: Job job_1447598349691_0003 failed with state FAILED due to: Task failed task_1447598349691_0003_r_000000
Job failed as tasks failed. failedMaps:0 failedReduces:1

15/11/15 09:16:54 INFO mapreduce.Job: Counters: 37
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=674742
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=113784
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=18
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Job Counters 
        Failed reduce tasks=4
        Launched map tasks=6
        Launched reduce tasks=4
        Data-local map tasks=6
        Total time spent by all maps in occupied slots (ms)=53496
        Total time spent by all reduces in occupied slots (ms)=18565
        Total time spent by all map tasks (ms)=53496
        Total time spent by all reduce tasks (ms)=18565
        Total vcore-seconds taken by all map tasks=53496
        Total vcore-seconds taken by all reduce tasks=18565
        Total megabyte-seconds taken by all map tasks=54779904
        Total megabyte-seconds taken by all reduce tasks=19010560
    Map-Reduce Framework
        Map input records=6600
        Map output records=0
        Map output bytes=0
        Map output materialized bytes=36
        Input split bytes=729
        Combine input records=0
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=452
        CPU time spent (ms)=4470
        Physical memory (bytes) snapshot=1628909568
        Virtual memory (bytes) snapshot=9392836608
        Total committed heap usage (bytes)=1279262720
    File Input Format Counters 
        Bytes Read=113055
15/11/15 09:16:54 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

为什么管道命令可以工作而hadoop执行不能?

jum4pzuy

jum4pzuy1#

reducers python脚本正在生成错误,因为变量curr\u show仅在读取行中声明 for 循环。只有在使用hadoop命令而不是piping命令时才会发生错误的原因是因为scoping(我对它非常陌生)。
通过在 for 循环,最后的打印命令得以执行。

prev_show          = "  "    #initialize previous word  to blank string
line_cnt           = 0      #count input lines.
count              = 0        #keep running total.
curr_show          = "  "

for line in sys.stdin:
    line       = line.strip()           #strip out carriage return
    key_value  = line.split('\t')       #split line, into key and value, returns a list
    line_cnt   = line_cnt+1   
    curr_show  = key_value[0]             #key is first item in list, indexed by 0
    value_in   = key_value[1]             #value is 2nd item

    if curr_show != prev_show and line_cnt>1:
        #print "\n"
        #print "---------------------Total---------------------"
        #print "\n"
        print (prev_show + "\t" + str(count))
        #print "\n"
        #print "------------------End of Item------------------"
        #print "\n"
        count = int(value_in)
    else:
        count = count + int(key_value[1])
        #print key_value[0] + "\t" + key_value[1]

    prev_show = curr_show  #set up previous show for the next set of input lines.

print (curr_show + "\t" + str(count))

此外,count变量已更改为重置为中的当前值,以便在show中更改时的当前值不会丢失。

r6hnlfcb

r6hnlfcb2#

看起来您没有正确使用hadoop流命令。而不是

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
   -input /user/cloudera/input \
   -output /user/cloudera/output_join \   
   -mapper /home/cloudera/mapper.py \   
   -reducer /home/cloudera/reducer.py

-mapper ,需要提供mapper命令。尝试

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
   -input /user/cloudera/input \
   -output /user/cloudera/output_join \  
   -mapper "python mapper.py" \   
   -reducer "python reducer.py" \
   -file /home/cloudera/mapper.py \   
   -file /home/cloudera/reducer.py

另外,通过在跟踪url打开任何失败的任务来检查错误日志,因为上面的日志没有多大帮助。

hwamh0ep

hwamh0ep3#

这个Map器和减速机仍然不能工作。我得到以下例外。你们有人发现这个问题吗?
用于此操作的命令是:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar  -input     /user/cloudera/input_join -output /user/cloudera/output_join2 -mapper '/home/cloudera/join2.mapper.py' -reducer '/home/cloudera/join2.reducer.py'

错误日志:
致命[ipc服务器处理程序5 on 51645]org.apache.hadoop.mapred.taskattemptlistenerimpl:任务:尝试\u 1449644802746 \u 0003 \u m \u000001 \u 0-退出:java.lang.runtimeexception:pipemapred.waitoutputthreads():子进程失败,代码为1,位于org.apache.hadoop.streaming.pipemapred.waitoutputthreads(pipemapred)。java:322)在org.apache.hadoop.streaming.pipemapred.mapredfinished(pipemapred。java:535)在org.apache.hadoop.streaming.pipemapper.close(pipemapper。java:130)在org.apache.hadoop.mapred.maprunner.run(maprunner。java:61)在org.apache.hadoop.streaming.pipemaprunner.run(pipemaprunner。java:34)在org.apache.hadoop.mapred.maptask.runoldmapper(maptask。java:453)在org.apache.hadoop.mapred.maptask.run(maptask。java:343)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:163)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:415)在org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation。java:1671)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)

相关问题