大数据:MapReduce 文件分发与打包 —— 以wordCount为例

x33g5p2x  于2022-06-08 转载在 MapReduce  
字(6.0k)|赞(0)|评价(0)|浏览(662)

一、文件分发与打包 -file为例

1、启动hadoop集群

$HADOOP_HOME/sbin/start-all.sh

2、准备好数据并上传HDFS

# 创建文件夹
hadoop fs -mkdir /test
#上传数据到HDFS文件
hadoop fs -put 1.data /test/
hadoop fs -put The_Man_of_Property.txt /test/

2.1 白名单 white_list 创建

vi white_list

suitable
against
recent

3、map.py

vi map.py

#!/usr/bin/python
import sys
import time

def read_local_file_func(f):
    word_set = set()
    file_in = open(f, 'r')
    for line in file_in:
        word = line.strip()
        word_set.add(word)
    return word_set

def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            if word != "" and (word in word_set):
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)

cat The_Man_of_Property.txt | python map.py mapper_func white_list| head -10
运行map.py,调用 mapper_func 函数, 以 white_list 白名单为例,获取有关于 white_list 头15条的数据结果:map 切分单词完成,接下来用red统计单词次数。

4、red.py

#!/usr/bin/python
import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')
        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0

        count_pool.append(int(val))

    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
        func(*args)
cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func

运行map reduce程序,统计The_Man_of_Property.txt文件中有 white_list 的单词的单词数量。

也可以把统计结果保存到 1.result文件

cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func > 1.result

5、用脚本一步到位

创建脚本 vi run.sh

HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output_file_test"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func white_list" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=3" \
    -file ./map.py \
    -file ./red.py \
    -file ./white_list

运行脚本:sh -x run.sh ,正常运行会有 map与reduce 的进度。
最后,查看你的 OUTPUT_PATH 路径:例如:

为什么有三个结果文件,各文件的数据是什么?

脚本中用了 -jobconf “mapred.reduce.tasks=3”,因此才会得出三个结果文件,把结果随机分到文件,如上图所示。

查看数据文件
hadoop fs -cat /output_file_test/part-00000

hadoop fs -cat /output_file_test/part-00001
hadoop fs -cat /output_file_test/part-00002

hadoop fs -cat /output_file_test/pa*

如何杀死hadoop job工作,不能用Ctrl+C,因为还是会运行,没真正杀死。

hadoop job -kill job_1654570610473_0004

这样才杀死。

二、文件分发与打包 -cachefile 为例

map.py 与 red.py 都是一样的,run脚本有些改动。
事先把白名单 white_list 上传到HDFS上。

hadoop fs -put white_list /test/

接着,编辑run脚本
vim run2.sh

HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachefile_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func ABC" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=2" \
    -jobconf  "mapred.job.name=cachefile_demo" \
    -cacheFile "hdfs://master:9000/test/white_list#ABC" \
    -file "./map.py" \
    -file "./red.py"

    #-cacheFile "$HDFS_FILE_PATH#WH" \
运行脚本 bash run2.sh

-mapper “python map.py mapper_func ABC”
后面ABC是一个参数
-cacheFile “hdfs://master:9000/test/white_list#ABC”
-cacheFile 是需要HDFS的 white_list文件路径再加#ABC参数;与mapper的ABC参数对应。

查找MapReduce运行的job作业

查找运行的job作业,需要在从节点查找。

22/06/07 19:51:25 INFO mapreduce.Job: Running job: job_1654570610473_0006
进入目录:
cd /usr/local/src/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache
逐步进入里面的文件,可以看到 有ABC文件,也就是说 white_list 通过 -cacheFile 可以用参数ABC代替; white_list《==》ABC 。

三、文件分发与打包 -cacheArchive

如果要分发的文件有目录结构,可以先将整个目录打包,然后上传到HDFS。
简单来说,之前我们都是对一个白名单white_list 文件操作,现在有多个白名单white_list1 white_list2 white_list3 文件,那要怎么办?
我们就要实现把多个白名单white_list 进行打包压缩。

打包为 white.tar.gz 命令:
tar -cvzf white.tar.gz white_list_1 white_list_2

把文件上次到HDFS

hadoop fs -put white.tar.gz /test/

3.1 map.py

#!/usr/bin/python
import os
import sys
import gzip
import time

def get_file_handler(f):
    file_in = open(f, 'r')
    return file_in

def get_cachefile_handlers(f):
    f_handlers_list = []
    if os.path.isdir(f):
        for fd in os.listdir(f):
            f_handlers_list.append(get_file_handler(f + '/' + fd))
    return f_handlers_list

def read_local_file_func(f):
    word_set = set()
    for cachefile in get_cachefile_handlers(f):
        for line in cachefile:
            word = line.strip()
            word_set.add(word)
    return word_set

def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            if word != "" and (word in word_set):
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)

3.2 red.py 不用改,新建一个脚本run3.sh

vim run3.sh

HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func WH.gz" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=2" \
    -jobconf  "mapred.job.name=cachefile_demo" \
    -cacheArchive "hdfs://master:9000/test/white.tar.gz#WH.gz" \
    -file "./map.py" \
    -file "./red.py"

-mapper 那里WH.gz 与 -cacheArchive #WH.gz 是一致的,WH.gz不是一个文件,是一个文件目录来着。
看下图:

运行脚本 bash run3.sh

可以看到,hadoop 会自动解压缩文件,并进行wordCount。

针对实际情况,我们用不同的 -file -cachefile -cachearchive 方法wordCount实现统计词频。

相关文章