大数据:脚本实现WordCount,结果以压缩格式输出到HDFS

x33g5p2x  于2022-07-05 转载在 HDFS  
字(4.7k)|赞(0)|评价(0)|浏览(763)

一、需求:

输出数据量较大时,可以使用Hadoop提供的压缩机制对数据进行压缩,减少网络传输带宽和存储的消耗。

  • 可以指定对map的输出也就是中间结果进行压缩;
  • 可以指定对reduce的输出也就是最终输出进行压缩;
  • 可以指定是否压缩以及采用哪种压缩方式;
  • 对map输出进行压缩主要是为了减少shuffle过程中网络传输数据量 ;
  • 对reduce输出进行压缩主要是减少输出结果占用的HDFS存储。

二、将输出进行压缩

目的:实现WordCount,词频统计结果,以压缩格式输出到HDFS。

2.1 确定map.py

  1. #!/usr/bin/python
  2. import os
  3. import sys
  4. import gzip
  5. def get_file_handler(f):
  6. file_in = open(f, 'r')
  7. return file_in
  8. def get_cachefile_handlers(f):
  9. f_handlers_list = []
  10. if os.path.isdir(f):
  11. for fd in os.listdir(f):
  12. f_handlers_list.append(get_file_handler(f + '/' + fd))
  13. return f_handlers_list
  14. def read_local_file_func(f):
  15. word_set = set()
  16. for cachefile in get_cachefile_handlers(f):
  17. for line in cachefile:
  18. word = line.strip()
  19. word_set.add(word)
  20. return word_set
  21. def mapper_func(white_list_fd):
  22. word_set = read_local_file_func(white_list_fd)
  23. for line in sys.stdin:
  24. ss = line.strip().split(' ')
  25. for s in ss:
  26. word = s.strip()
  27. #if word != "" and (word in word_set):
  28. if word !="":
  29. print "%s\t%s" % (s, 1)
  30. if __name__ == "__main__":
  31. module = sys.modules[__name__]
  32. func = getattr(module, sys.argv[1])
  33. args = None
  34. if len(sys.argv) > 1:
  35. args = sys.argv[2:]
  36. func(*args)

2.2 red.py

  1. #!/usr/bin/python
  2. import sys
  3. def reduer_func():
  4. current_word = None
  5. count_pool = []
  6. sum = 0
  7. for line in sys.stdin:
  8. word, val = line.strip().split('\t')
  9. if current_word == None:
  10. current_word = word
  11. if current_word != word:
  12. for count in count_pool:
  13. sum += count
  14. print "%s\t%s" % (current_word, sum)
  15. current_word = word
  16. count_pool = []
  17. sum = 0
  18. count_pool.append(int(val))
  19. for count in count_pool:
  20. sum += count
  21. print "%s\t%s" % (current_word, str(sum))
  22. if __name__ == "__main__":
  23. module = sys.modules[__name__]
  24. func = getattr(module, sys.argv[1])
  25. args = None
  26. if len(sys.argv) > 1:
  27. args = sys.argv[2:]
  28. func(*args)

用脚本一键启动map.py 与 red.py。

怎么写脚本?

2.3 run.sh

  1. 确定:HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  2. 确定:STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  3. 确定:input 输入文件的路径,"/test/The_Man_of_Property.txt"
  4. 确定:output 输出文件的路径,"/output_cachearchive_broadcast"
  5. 创建 mapper程序
  6. 创建 reduce程序
  1. HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  2. STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  3. INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
  4. OUTPUT_PATH="/output_cachearchive_broadcast"
  5. # 输出文件已存在就删掉,避免因为文件存在导致的运行报错
  6. $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
  7. # Step 1.
  8. $HADOOP_CMD jar $STREAM_JAR_PATH \
  9. -input $INPUT_FILE_PATH_1 \
  10. -output $OUTPUT_PATH \
  11. -mapper "python map.py mapper_func WH.gz" \
  12. -reducer "python red.py reduer_func" \
  13. -jobconf "mapred.reduce.tasks=5" \
  14. -jobconf "mapred.job.name=cachefile_demo" \
  15. -jobconf "mapred.compress.map.output=true" \
  16. -jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
  17. -jobconf "mapred.output.compress=true" \
  18. -jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
  19. -cacheArchive "hdfs://master:9000/test/white.tar.gz#WH.gz" \
  20. -file "./map.py" \
  21. -file "./red.py"

-mapper “python map.py mapper_func WH.gz”
-mapper意思是map函数;
python map.py mapper_func 意思是调用python的map.py程序里的mapper_func函数方法;
后面的WH.gz是一个参数,参数对应的是:-cacheArchive的#WH.gz;
-cacheArchive “hdfs://master:9000/test/white.tar.gz#WH.gz”
WH.gz 就是代表 white.tar.gz
-jobconf:提交作业的一些配置属性
常见配置:
(1)mapred.map.tasks:map task数目
(2)mapred.reduce.tasks:reduce task数目

mapred.job.name作业名
mapred.job.priority作业优先级
mapred.compress.map.outputmap的输出是否压缩
mapred.map.output.compression.codecmap的输出压缩方式
mapred.output.compressreduce的输出是否压缩
mapred.output.compression.codecreduce的输出压缩方式
mapred.job.map.capacity最多同时运行map任务数
mapred.job.reduce.capacity最多同时运行reduce任务数
mapred.task.timeout任务没有响应(输入输出)的最大时间

run.sh 脚本执行完毕,查看结果:hadoop fs -ls /output_cachearchive_broadcast

可以看到输出结果是五个压缩包。看看里面的内容:

1. 用cat方法hadoop fs -cat /output_cachearchive_broadcast/part-00000.gz | head ,会乱码,查看不了,得换一种。

2. 用text方法hadoop fs -text /output_cachearchive_broadcast/part-00000.gz | head 可以查看压缩文件内容!!

完成:实现WordCount,词频统计结果,以压缩格式输出到HDFS。

如何对这些压缩文件解压呢?

用脚本一键解压。

三、解压缩文件

3.1 设置脚本

  1. 确定:HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  2. 确定:STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  3. 确定:input 压缩文件的路径,"/output_cachearchive_broadcast"
  4. 确定:output 输出解压文件的路径,"/output_cat"
  5. mapper 为 "cat"
  6. -jobconf map red 任务为0
  1. HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  2. STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  3. INPUT_PATH="/output_cachearchive_broadcast"
  4. OUTPUT_PATH="/output_cat"
  5. #$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
  6. # Step 2.
  7. $HADOOP_CMD jar $STREAM_JAR_PATH \
  8. -input $INPUT_PATH\
  9. -output $OUTPUT_PATH\
  10. -mapper "cat" \
  11. -jobconf "mapred.reduce.tasks=0"

运行脚本 run.sh

hadoop fs -ls /output_cat
1. 用cat方法hadoop fs -cat /output_cat/part-00000 | head

相关文章

最新文章

更多