大数据:对数据进行简单排序、分桶排序——以WordCount为例

x33g5p2x  于2022-07-05 转载在 其他  
字(3.8k)|赞(0)|评价(0)|浏览(434)

一、命令行进行排序

对小文件数据排序怎么排?
有文件a.txt

1	hadoop
3	hadoop
5	hadoop
7	hadoop
9	hadoop
11	hadoop
13	hadoop
15	hadoop
17	hadoop
19	hadoop
21	hadoop
23	hadoop
25	hadoop
27	hadoop
29	hadoop
31	hadoop
33	hadoop
35	hadoop
37	hadoop
39	hadoop
41	hadoop
43	hadoop
45	hadoop
47	hadoop
49	hadoop
51	hadoop
53	hadoop
55	hadoop
57	hadoop
59	hadoop
61	hadoop
63	hadoop
65	hadoop
67	hadoop
69	hadoop
71	hadoop
73	hadoop
75	hadoop
77	hadoop
79	hadoop
81	hadoop
83	hadoop
85	hadoop
87	hadoop
89	hadoop
91	hadoop
93	hadoop
95	hadoop
97	hadoop
99	hadoop

b.txt

0	java
2	java
4	java
6	java
8	java
10	java
12	java
14	java
16	java
18	java
20	java
22	java
24	java
26	java
28	java
30	java
32	java
34	java
36	java
38	java
40	java
42	java
44	java
46	java
48	java
50	java
52	java
54	java
56	java
58	java
60	java
62	java
64	java
66	java
68	java
70	java
72	java
74	java
76	java
78	java
80	java
82	java
84	java
86	java
88	java
90	java
92	java
94	java
96	java
98	java
100	java

现在要求对a.txt b.txt 合在一起进行排序,有什么方法?
1. 升序cat a.txt b.txt|sort -k1 -n |head

2. 逆序把结果保存到c.txtcat a.txt b.txt|sort -k1 -nr |head > c.txt

二、小数据量的简单排序(不用分桶)

2.1 把文件上传HDFS

hadoop fs -put  a.txt /test/
hadoop fs -put  b.txt /test/

2.2 map_sort.py

创建map程序:vim map_sort.py

#!/usr/local/bin/python

import sys

base_count = 10000
#base_count = 99999

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    #new_key = base_count - int(key)
    new_key = base_count + int(key)
    print "%s\t%s" % (new_key, val)

在map red中,对key排序,key默认是字符串,因此需要统一长度,所以 base_count = 10000

  1. map排序:cat a.txt b.txt |python map_sort.py | sort -k1 | head

2.3 red_sort.py

vim red_sort.py

#!/usr/local/bin/python

import sys

base_value = 10000
#base_value = 99999

for line in sys.stdin:
    key, val = line.strip().split('\t')
    print str(int(key) - base_value) + "\t" + val
    #print str(base_value - int(key)) + "\t" + val

数据排序输出cat a.txt b.txt |python map_sort.py | sort -k1 | python red_sort.py |head -20

2.4 run.sh

脚本一键实现数据的排序,vim run.sh

  1. 确定输入文件路径:INPUT_FILE_PATH_A,INPUT_FILE_PATH_B
  2. 确定输出文件路径:OUTPUT_SORT_PATH
  3. 确定 mapper 函数
  4. 确定 reduce 函数
set -e -x

HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_A="/test/a.txt"
INPUT_FILE_PATH_B="/test/b.txt"

OUTPUT_SORT_PATH="/output_sort"

#$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B \
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_sort.py" \
    -reducer "python red_sort.py" \
    -jobconf "mapred.reduce.tasks=1" \
    -file ./map_sort.py \
    -file ./red_sort.py

运行脚本: bash run.sh

查看结果:hadoop fs -cat /output_sort/p* | head

OK,这是对小数据量map reduce的一个排序。

当遇到大数据量的时候怎么办?
需要多个reduce进行并发操作,也就是red任务多几个:-jobconf "mapred.reduce.tasks=1"

假如reduce有2个任务,数据分为1-50一个reduce,51-100一个reduce。
这就要用到桶排序了!!

三、多并发、分桶形式排序

3.1 map_sort.py

#!/usr/local/bin/python

import sys

base_count = 10000

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    new_key = base_count + int(key)

    red_idx = 1
    if new_key < (10100 + 10000) / 2:
        red_idx = 0

    print "%s\t%s\t%s" % (red_idx, new_key, val)

3.2 red_sort.py

#!/usr/local/bin/python

import sys

base_count = 10000

for line in sys.stdin:
    idx_id, key, val = line.strip().split('\t')

    new_key = int(key) - base_count
    print '\t'.join([str(new_key), val])

3.3 run.sh

set -e -x

HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_A="/test/a.txt"
INPUT_FILE_PATH_B="/test/b.txt"

OUTPUT_SORT_PATH="/output_sort"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_sort.py" \
    -reducer "python red_sort.py" \
    -file ./map_sort.py \
    -file ./red_sort.py \
    -jobconf mapred.reduce.tasks=2 \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf num.key.fields.for.partition=1 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

运行脚本: bash run.sh

查看结果:hadoop fs -ls /output_sort

可以看到,确实把数据分为两个部分,1-50与51-100。

相关文章