大数据:对数据进行简单排序、分桶排序——以WordCount为例

x33g5p2x  于2022-07-05 转载在 其他  
字(3.8k)|赞(0)|评价(0)|浏览(493)

一、命令行进行排序

对小文件数据排序怎么排?
有文件a.txt

  1. 1 hadoop
  2. 3 hadoop
  3. 5 hadoop
  4. 7 hadoop
  5. 9 hadoop
  6. 11 hadoop
  7. 13 hadoop
  8. 15 hadoop
  9. 17 hadoop
  10. 19 hadoop
  11. 21 hadoop
  12. 23 hadoop
  13. 25 hadoop
  14. 27 hadoop
  15. 29 hadoop
  16. 31 hadoop
  17. 33 hadoop
  18. 35 hadoop
  19. 37 hadoop
  20. 39 hadoop
  21. 41 hadoop
  22. 43 hadoop
  23. 45 hadoop
  24. 47 hadoop
  25. 49 hadoop
  26. 51 hadoop
  27. 53 hadoop
  28. 55 hadoop
  29. 57 hadoop
  30. 59 hadoop
  31. 61 hadoop
  32. 63 hadoop
  33. 65 hadoop
  34. 67 hadoop
  35. 69 hadoop
  36. 71 hadoop
  37. 73 hadoop
  38. 75 hadoop
  39. 77 hadoop
  40. 79 hadoop
  41. 81 hadoop
  42. 83 hadoop
  43. 85 hadoop
  44. 87 hadoop
  45. 89 hadoop
  46. 91 hadoop
  47. 93 hadoop
  48. 95 hadoop
  49. 97 hadoop
  50. 99 hadoop

b.txt

  1. 0 java
  2. 2 java
  3. 4 java
  4. 6 java
  5. 8 java
  6. 10 java
  7. 12 java
  8. 14 java
  9. 16 java
  10. 18 java
  11. 20 java
  12. 22 java
  13. 24 java
  14. 26 java
  15. 28 java
  16. 30 java
  17. 32 java
  18. 34 java
  19. 36 java
  20. 38 java
  21. 40 java
  22. 42 java
  23. 44 java
  24. 46 java
  25. 48 java
  26. 50 java
  27. 52 java
  28. 54 java
  29. 56 java
  30. 58 java
  31. 60 java
  32. 62 java
  33. 64 java
  34. 66 java
  35. 68 java
  36. 70 java
  37. 72 java
  38. 74 java
  39. 76 java
  40. 78 java
  41. 80 java
  42. 82 java
  43. 84 java
  44. 86 java
  45. 88 java
  46. 90 java
  47. 92 java
  48. 94 java
  49. 96 java
  50. 98 java
  51. 100 java

现在要求对a.txt b.txt 合在一起进行排序,有什么方法?
1. 升序cat a.txt b.txt|sort -k1 -n |head

2. 逆序把结果保存到c.txtcat a.txt b.txt|sort -k1 -nr |head > c.txt

二、小数据量的简单排序(不用分桶)

2.1 把文件上传HDFS

  1. hadoop fs -put a.txt /test/
  2. hadoop fs -put b.txt /test/

2.2 map_sort.py

创建map程序:vim map_sort.py

  1. #!/usr/local/bin/python
  2. import sys
  3. base_count = 10000
  4. #base_count = 99999
  5. for line in sys.stdin:
  6. ss = line.strip().split('\t')
  7. key = ss[0]
  8. val = ss[1]
  9. #new_key = base_count - int(key)
  10. new_key = base_count + int(key)
  11. print "%s\t%s" % (new_key, val)

在map red中,对key排序,key默认是字符串,因此需要统一长度,所以 base_count = 10000

  1. map排序:cat a.txt b.txt |python map_sort.py | sort -k1 | head

2.3 red_sort.py

vim red_sort.py

  1. #!/usr/local/bin/python
  2. import sys
  3. base_value = 10000
  4. #base_value = 99999
  5. for line in sys.stdin:
  6. key, val = line.strip().split('\t')
  7. print str(int(key) - base_value) + "\t" + val
  8. #print str(base_value - int(key)) + "\t" + val

数据排序输出cat a.txt b.txt |python map_sort.py | sort -k1 | python red_sort.py |head -20

2.4 run.sh

脚本一键实现数据的排序,vim run.sh

  1. 确定输入文件路径:INPUT_FILE_PATH_A,INPUT_FILE_PATH_B
  2. 确定输出文件路径:OUTPUT_SORT_PATH
  3. 确定 mapper 函数
  4. 确定 reduce 函数
  1. set -e -x
  2. HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  3. STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  4. INPUT_FILE_PATH_A="/test/a.txt"
  5. INPUT_FILE_PATH_B="/test/b.txt"
  6. OUTPUT_SORT_PATH="/output_sort"
  7. #$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
  8. # Step 3.
  9. $HADOOP_CMD jar $STREAM_JAR_PATH \
  10. -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B \
  11. -output $OUTPUT_SORT_PATH \
  12. -mapper "python map_sort.py" \
  13. -reducer "python red_sort.py" \
  14. -jobconf "mapred.reduce.tasks=1" \
  15. -file ./map_sort.py \
  16. -file ./red_sort.py

运行脚本: bash run.sh

查看结果:hadoop fs -cat /output_sort/p* | head

OK,这是对小数据量map reduce的一个排序。

当遇到大数据量的时候怎么办?
需要多个reduce进行并发操作,也就是red任务多几个:-jobconf "mapred.reduce.tasks=1"

假如reduce有2个任务,数据分为1-50一个reduce,51-100一个reduce。
这就要用到桶排序了!!

三、多并发、分桶形式排序

3.1 map_sort.py

  1. #!/usr/local/bin/python
  2. import sys
  3. base_count = 10000
  4. for line in sys.stdin:
  5. ss = line.strip().split('\t')
  6. key = ss[0]
  7. val = ss[1]
  8. new_key = base_count + int(key)
  9. red_idx = 1
  10. if new_key < (10100 + 10000) / 2:
  11. red_idx = 0
  12. print "%s\t%s\t%s" % (red_idx, new_key, val)

3.2 red_sort.py

  1. #!/usr/local/bin/python
  2. import sys
  3. base_count = 10000
  4. for line in sys.stdin:
  5. idx_id, key, val = line.strip().split('\t')
  6. new_key = int(key) - base_count
  7. print '\t'.join([str(new_key), val])

3.3 run.sh

  1. set -e -x
  2. HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
  3. STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
  4. INPUT_FILE_PATH_A="/test/a.txt"
  5. INPUT_FILE_PATH_B="/test/b.txt"
  6. OUTPUT_SORT_PATH="/output_sort"
  7. $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
  8. # Step 3.
  9. $HADOOP_CMD jar $STREAM_JAR_PATH \
  10. -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
  11. -output $OUTPUT_SORT_PATH \
  12. -mapper "python map_sort.py" \
  13. -reducer "python red_sort.py" \
  14. -file ./map_sort.py \
  15. -file ./red_sort.py \
  16. -jobconf mapred.reduce.tasks=2 \
  17. -jobconf stream.num.map.output.key.fields=2 \
  18. -jobconf num.key.fields.for.partition=1 \
  19. -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

运行脚本: bash run.sh

查看结果:hadoop fs -ls /output_sort

可以看到,确实把数据分为两个部分,1-50与51-100。

相关文章