大数据:MapReduce 文件分发与打包 —— 以wordCount为例

x33g5p2x  于2022-06-08 转载在 MapReduce  
字(6.0k)|赞(0)|评价(0)|浏览(768)

一、文件分发与打包 -file为例

1、启动hadoop集群

  1. $HADOOP_HOME/sbin/start-all.sh

2、准备好数据并上传HDFS

  1. # 创建文件夹
  2. hadoop fs -mkdir /test
  3. #上传数据到HDFS文件
  4. hadoop fs -put 1.data /test/
  5. hadoop fs -put The_Man_of_Property.txt /test/

2.1 白名单 white_list 创建

vi white_list

  1. suitable
  2. against
  3. recent

3、map.py

vi map.py

  1. #!/usr/bin/python
  2. import sys
  3. import time
  4. def read_local_file_func(f):
  5. word_set = set()
  6. file_in = open(f, 'r')
  7. for line in file_in:
  8. word = line.strip()
  9. word_set.add(word)
  10. return word_set
  11. def mapper_func(white_list_fd):
  12. word_set = read_local_file_func(white_list_fd)
  13. for line in sys.stdin:
  14. ss = line.strip().split(' ')
  15. for s in ss:
  16. word = s.strip()
  17. if word != "" and (word in word_set):
  18. print "%s\t%s" % (s, 1)
  19. if __name__ == "__main__":
  20. module = sys.modules[__name__]
  21. func = getattr(module, sys.argv[1])
  22. args = None
  23. if len(sys.argv) > 1:
  24. args = sys.argv[2:]
  25. func(*args)

cat The_Man_of_Property.txt | python map.py mapper_func white_list| head -10
运行map.py,调用 mapper_func 函数, 以 white_list 白名单为例,获取有关于 white_list 头15条的数据结果:map 切分单词完成,接下来用red统计单词次数。

4、red.py

  1. #!/usr/bin/python
  2. import sys
  3. def reduer_func():
  4. current_word = None
  5. count_pool = []
  6. sum = 0
  7. for line in sys.stdin:
  8. word, val = line.strip().split('\t')
  9. if current_word == None:
  10. current_word = word
  11. if current_word != word:
  12. for count in count_pool:
  13. sum += count
  14. print "%s\t%s" % (current_word, sum)
  15. current_word = word
  16. count_pool = []
  17. sum = 0
  18. count_pool.append(int(val))
  19. for count in count_pool:
  20. sum += count
  21. print "%s\t%s" % (current_word, str(sum))
  22. if __name__ == "__main__":
  23. module = sys.modules[__name__]
  24. func = getattr(module, sys.argv[1])
  25. args = None
  26. if len(sys.argv) > 1:
  27. args = sys.argv[2:]
  28. func(*args)
  1. cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func

运行map reduce程序,统计The_Man_of_Property.txt文件中有 white_list 的单词的单词数量。

也可以把统计结果保存到 1.result文件

  1. cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func > 1.result

5、用脚本一步到位

创建脚本 vi run.sh

  1. HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  2. STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  3. INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
  4. OUTPUT_PATH="/output_file_test"
  5. $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
  6. # Step 1.
  7. $HADOOP_CMD jar $STREAM_JAR_PATH \
  8. -input $INPUT_FILE_PATH_1 \
  9. -output $OUTPUT_PATH \
  10. -mapper "python map.py mapper_func white_list" \
  11. -reducer "python red.py reduer_func" \
  12. -jobconf "mapred.reduce.tasks=3" \
  13. -file ./map.py \
  14. -file ./red.py \
  15. -file ./white_list

运行脚本:sh -x run.sh ,正常运行会有 map与reduce 的进度。
最后,查看你的 OUTPUT_PATH 路径:例如:

为什么有三个结果文件,各文件的数据是什么?

脚本中用了 -jobconf “mapred.reduce.tasks=3”,因此才会得出三个结果文件,把结果随机分到文件,如上图所示。

  1. 查看数据文件
  2. hadoop fs -cat /output_file_test/part-00000
  3. hadoop fs -cat /output_file_test/part-00001
  4. hadoop fs -cat /output_file_test/part-00002
  5. hadoop fs -cat /output_file_test/pa*

如何杀死hadoop job工作,不能用Ctrl+C,因为还是会运行,没真正杀死。

  1. hadoop job -kill job_1654570610473_0004

这样才杀死。

二、文件分发与打包 -cachefile 为例

map.py 与 red.py 都是一样的,run脚本有些改动。
事先把白名单 white_list 上传到HDFS上。

  1. hadoop fs -put white_list /test/

接着,编辑run脚本
vim run2.sh

  1. HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  2. STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  3. INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
  4. OUTPUT_PATH="/output_cachefile_broadcast"
  5. $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
  6. # Step 1.
  7. $HADOOP_CMD jar $STREAM_JAR_PATH \
  8. -input $INPUT_FILE_PATH_1 \
  9. -output $OUTPUT_PATH \
  10. -mapper "python map.py mapper_func ABC" \
  11. -reducer "python red.py reduer_func" \
  12. -jobconf "mapred.reduce.tasks=2" \
  13. -jobconf "mapred.job.name=cachefile_demo" \
  14. -cacheFile "hdfs://master:9000/test/white_list#ABC" \
  15. -file "./map.py" \
  16. -file "./red.py"
  17. #-cacheFile "$HDFS_FILE_PATH#WH" \
  1. 运行脚本 bash run2.sh

-mapper “python map.py mapper_func ABC”
后面ABC是一个参数
-cacheFile “hdfs://master:9000/test/white_list#ABC”
-cacheFile 是需要HDFS的 white_list文件路径再加#ABC参数;与mapper的ABC参数对应。

查找MapReduce运行的job作业

查找运行的job作业,需要在从节点查找。

  1. 22/06/07 19:51:25 INFO mapreduce.Job: Running job: job_1654570610473_0006
  1. 进入目录:
  2. cd /usr/local/src/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache
  3. 逐步进入里面的文件,可以看到 ABC文件,也就是说 white_list 通过 -cacheFile 可以用参数ABC代替; white_list《==》ABC

三、文件分发与打包 -cacheArchive

如果要分发的文件有目录结构,可以先将整个目录打包,然后上传到HDFS。
简单来说,之前我们都是对一个白名单white_list 文件操作,现在有多个白名单white_list1 white_list2 white_list3 文件,那要怎么办?
我们就要实现把多个白名单white_list 进行打包压缩。

  1. 打包为 white.tar.gz 命令:
  2. tar -cvzf white.tar.gz white_list_1 white_list_2

把文件上次到HDFS

  1. hadoop fs -put white.tar.gz /test/

3.1 map.py

  1. #!/usr/bin/python
  2. import os
  3. import sys
  4. import gzip
  5. import time
  6. def get_file_handler(f):
  7. file_in = open(f, 'r')
  8. return file_in
  9. def get_cachefile_handlers(f):
  10. f_handlers_list = []
  11. if os.path.isdir(f):
  12. for fd in os.listdir(f):
  13. f_handlers_list.append(get_file_handler(f + '/' + fd))
  14. return f_handlers_list
  15. def read_local_file_func(f):
  16. word_set = set()
  17. for cachefile in get_cachefile_handlers(f):
  18. for line in cachefile:
  19. word = line.strip()
  20. word_set.add(word)
  21. return word_set
  22. def mapper_func(white_list_fd):
  23. word_set = read_local_file_func(white_list_fd)
  24. for line in sys.stdin:
  25. ss = line.strip().split(' ')
  26. for s in ss:
  27. word = s.strip()
  28. if word != "" and (word in word_set):
  29. print "%s\t%s" % (s, 1)
  30. if __name__ == "__main__":
  31. module = sys.modules[__name__]
  32. func = getattr(module, sys.argv[1])
  33. args = None
  34. if len(sys.argv) > 1:
  35. args = sys.argv[2:]
  36. func(*args)

3.2 red.py 不用改,新建一个脚本run3.sh

vim run3.sh

  1. HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  2. STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  3. INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
  4. OUTPUT_PATH="/output_cachearchive_broadcast"
  5. $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
  6. # Step 1.
  7. $HADOOP_CMD jar $STREAM_JAR_PATH \
  8. -input $INPUT_FILE_PATH_1 \
  9. -output $OUTPUT_PATH \
  10. -mapper "python map.py mapper_func WH.gz" \
  11. -reducer "python red.py reduer_func" \
  12. -jobconf "mapred.reduce.tasks=2" \
  13. -jobconf "mapred.job.name=cachefile_demo" \
  14. -cacheArchive "hdfs://master:9000/test/white.tar.gz#WH.gz" \
  15. -file "./map.py" \
  16. -file "./red.py"

-mapper 那里WH.gz 与 -cacheArchive #WH.gz 是一致的,WH.gz不是一个文件,是一个文件目录来着。
看下图:

  1. 运行脚本 bash run3.sh

可以看到,hadoop 会自动解压缩文件,并进行wordCount。

针对实际情况,我们用不同的 -file -cachefile -cachearchive 方法wordCount实现统计词频。

相关文章