与这篇博文有点关系的可以参考下:
① MapReduce 计算框架 —— 执行流程详解
② 在Linux环境实现wordcount:mapper,reducer的代码创建,脚本实现map,reduce
③ Linux实现 map 返回列表形式操作
④ Linux hadoop 脚本实现 reduce合并数据
有 a_join.txt 数据
user_id order_id
aaa1 123
aaa2 123
aaa3 123
aaa4 123
aaa5 123
aaa6 123
aaa7 123
aaa8 123
aaa9 123
aaa10 123
aaa11 123
有 b_join.txt 数据
user_id amount
aaa1 hadoop
aaa2 hadoop
aaa3 hadoop
aaa4 hadoop
aaa5 hadoop
aaa6 hadoop
aaa7 hadoop
aaa8 hadoop
aaa9 hadoop
aaa10 hadoop
aaa11 hadoop
使得两个数据,以key合成新的数据。
user_id order_id amount
aaa1 123 hadoop
aaa2 123 hadoop
怎么办?
可以先把 a_join 转为 map_a;
aaa1 1 123
aaa2 1 123
同理,b_join 转为 map_b;
aaa1 2 hadoop
aaa2 2 hadoop
vi map_a.py
#!/usr/local/bin/python
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
key = ss[0]
val = ss[1]
print ("%s\t1\t%s" % (key, val))
把输出数据复制到 a1 中
vi map_b.py
#!/usr/local/bin/python
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
key = ss[0]
val = ss[1]
print ("%s\t2\t%s" % (key, val))
把输出数据复制到 b2 中
要对两个map函数的返回结果文件a1,b2 进行reduce操作。
import sys
val_1 = ""
for line in sys.stdin:
key, flag, val = line.strip().split('\t')
if flag == '1':
val_1 = val
elif flag == '2' and val_1 != "":
val_2 = val
print ("%s\t%s\t%s" % (key, val_1, val_2))
val_1 = ""
如上图,已经把两个文件reduce成,我们想要的结果。
创建 run.sh
set -e -x
HADOOP_CMD="/usr/hadoop/hadoop-2.7.3/bin/hadoop"
# hdfs输入路径
INPUT_FILE_PATH_A="/test/a_join.txt"
INPUT_FILE_PATH_B="/test/b_join.txt"
# hdfs输出路径
OUTPUT_A_PATH="/output/a"
OUTPUT_B_PATH="/output/b"
OUTPUT_JOIN_PATH="/output/join"
# 已经有输出路径就进行删除
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_A_PATH $OUTPUT_B_PATH $OUTPUT_JOIN_PATH
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_JOIN_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_A \
-output $OUTPUT_A_PATH \
-mapper "python map_a.py" \
-file ./map_a.py \
# Step 2.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_B \
-output $OUTPUT_B_PATH \
-mapper "python map_b.py" \
-file ./map_b.py \
# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $OUTPUT_A_PATH,$OUTPUT_B_PATH \
-output $OUTPUT_JOIN_PATH \
-mapper "cat" \
-reducer "python red_join.py" \
-file ./red_join.py \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf num.key.fields.for.partition=1
-mapper "cat"
意思是把前面两个map输出结果作为red的输入;
-jobconf stream.num.map.output.key.fields=2
意思是: 组合key=(aaa1 1), value=123;
-jobconf num.key.fields.for.partition=1
意思是:使用数据的第一列作为partition。
查看是否已经上次输入文件hadoop fs -ls /test
,没有就上传文件 hadoop fs -put ./a_join.txt b_join.txt /test/
。
查看输出文件是否存在,hadoop fs -ls /output
执行脚本命令:sh -x run.sh
查看输出目录的 /output/join
hadoop fs -ls /output/join
hadoop fs -cat /output/join | head
脚本实现完成!!!
【注意细节】:
(1)修改别人代码前,必须备份!!!
(2)查看别人代码时,习惯使用 :q! 强制退出不保存
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_44775255/article/details/121093295
内容来源于网络,如有侵权,请联系作者删除!