$HADOOP_HOME/sbin/start-all.sh
# 创建文件夹
hadoop fs -mkdir /test
#上传数据到HDFS文件
hadoop fs -put 1.data /test/
hadoop fs -put The_Man_of_Property.txt /test/
vi white_list
suitable
against
recent
vi map.py
#!/usr/bin/python
import sys
import time
def read_local_file_func(f):
word_set = set()
file_in = open(f, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print "%s\t%s" % (s, 1)
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
cat The_Man_of_Property.txt | python map.py mapper_func white_list| head -10
运行map.py,调用 mapper_func 函数, 以 white_list 白名单为例,获取有关于 white_list 头15条的数据结果:map 切分单词完成,接下来用red统计单词次数。
#!/usr/bin/python
import sys
def reduer_func():
current_word = None
count_pool = []
sum = 0
for line in sys.stdin:
word, val = line.strip().split('\t')
if current_word == None:
current_word = word
if current_word != word:
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, sum)
current_word = word
count_pool = []
sum = 0
count_pool.append(int(val))
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, str(sum))
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func
运行map reduce程序,统计The_Man_of_Property.txt文件中有 white_list 的单词的单词数量。
也可以把统计结果保存到 1.result文件
cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func > 1.result
创建脚本 vi run.sh
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output_file_test"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func white_list" \
-reducer "python red.py reduer_func" \
-jobconf "mapred.reduce.tasks=3" \
-file ./map.py \
-file ./red.py \
-file ./white_list
运行脚本:sh -x run.sh ,正常运行会有 map与reduce 的进度。
最后,查看你的 OUTPUT_PATH 路径:例如:
为什么有三个结果文件,各文件的数据是什么?
脚本中用了 -jobconf “mapred.reduce.tasks=3”,因此才会得出三个结果文件,把结果随机分到文件,如上图所示。
查看数据文件
hadoop fs -cat /output_file_test/part-00000
hadoop fs -cat /output_file_test/part-00001
hadoop fs -cat /output_file_test/part-00002
hadoop fs -cat /output_file_test/pa*
如何杀死hadoop job工作,不能用Ctrl+C,因为还是会运行,没真正杀死。
hadoop job -kill job_1654570610473_0004
这样才杀死。
map.py 与 red.py 都是一样的,run脚本有些改动。
事先把白名单 white_list 上传到HDFS上。
hadoop fs -put white_list /test/
接着,编辑run脚本
vim run2.sh
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachefile_broadcast"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func ABC" \
-reducer "python red.py reduer_func" \
-jobconf "mapred.reduce.tasks=2" \
-jobconf "mapred.job.name=cachefile_demo" \
-cacheFile "hdfs://master:9000/test/white_list#ABC" \
-file "./map.py" \
-file "./red.py"
#-cacheFile "$HDFS_FILE_PATH#WH" \
运行脚本 bash run2.sh
-mapper “python map.py mapper_func ABC”
后面ABC是一个参数
-cacheFile “hdfs://master:9000/test/white_list#ABC”
-cacheFile 是需要HDFS的 white_list文件路径再加#ABC参数;与mapper的ABC参数对应。
查找运行的job作业,需要在从节点查找。
22/06/07 19:51:25 INFO mapreduce.Job: Running job: job_1654570610473_0006
进入目录:
cd /usr/local/src/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache
逐步进入里面的文件,可以看到 有ABC文件,也就是说 white_list 通过 -cacheFile 可以用参数ABC代替; white_list《==》ABC 。
如果要分发的文件有目录结构,可以先将整个目录打包,然后上传到HDFS。
简单来说,之前我们都是对一个白名单white_list 文件操作,现在有多个白名单white_list1 white_list2 white_list3 文件,那要怎么办?
我们就要实现把多个白名单white_list 进行打包压缩。
打包为 white.tar.gz 命令:
tar -cvzf white.tar.gz white_list_1 white_list_2
把文件上次到HDFS
hadoop fs -put white.tar.gz /test/
#!/usr/bin/python
import os
import sys
import gzip
import time
def get_file_handler(f):
file_in = open(f, 'r')
return file_in
def get_cachefile_handlers(f):
f_handlers_list = []
if os.path.isdir(f):
for fd in os.listdir(f):
f_handlers_list.append(get_file_handler(f + '/' + fd))
return f_handlers_list
def read_local_file_func(f):
word_set = set()
for cachefile in get_cachefile_handlers(f):
for line in cachefile:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print "%s\t%s" % (s, 1)
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
vim run3.sh
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func WH.gz" \
-reducer "python red.py reduer_func" \
-jobconf "mapred.reduce.tasks=2" \
-jobconf "mapred.job.name=cachefile_demo" \
-cacheArchive "hdfs://master:9000/test/white.tar.gz#WH.gz" \
-file "./map.py" \
-file "./red.py"
-mapper 那里WH.gz 与 -cacheArchive #WH.gz 是一致的,WH.gz不是一个文件,是一个文件目录来着。
看下图:
运行脚本 bash run3.sh
可以看到,hadoop 会自动解压缩文件,并进行wordCount。
针对实际情况,我们用不同的 -file -cachefile -cachearchive 方法wordCount实现统计词频。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_44775255/article/details/125170177
内容来源于网络,如有侵权,请联系作者删除!