hadoop 配置单元数据检索查询:CLUSTER BY、ORDER BY和SORT BY之间的区别

mkshixfv  于 2023-06-21  发布在  Hadoop
关注(0)|答案(8)|浏览(175)

在Hive上,用于数据检索查询(例如SELECT ...)、NOT数据定义(例如CREATE TABLES ...),据我所知:

  • SORT BY仅在reducer中使用排序
  • ORDER BY在全球订购东西,但将所有东西都塞进一个减速器中
  • CLUSTER BY智能地将数据通过键散列分配到reducer中,并进行排序

所以我的问题是

  1. CLUSTER BY能保证全球秩序吗?
  2. DISTRIBUTE BY将相同的键放入相同的reducer中,但是相邻的键呢?
    我能找到的唯一文档是here,从示例中看,它似乎是全局排序的。但从定义上看,我觉得它并不总是这样。
rpppsulh

rpppsulh1#

简而言之,对于您的问题:

较长版本:
要理解Hive,首先你必须理解Map Reduce。Map Reduce对于这个问题的相关属性是,当数据到达reducing阶段时,它们总是在reducing步骤之前在mapper或reducer上进行 Shuffle 和排序。然后:

  • ORDER BY x:数据通过使用一个reducer进行最后一个reducing步骤来保证x的全局排序。对于大输出,这将是缓慢的。然而,即使对于处理大型数据集,只要您在最后一步知道数据大小很小,它也是可以接受的。(例如,在过滤和聚合之后)
  • SORT BY x:保证在N个reducer中的每个reducer上 * 具体地 * 通过x进行局部排序。不保证全局排序。
  • DISTRIBUTE BY x:保证相同分发密钥x的行进入相同的reducer,但不保证您想要的顺序。还原阶段的数据仍然排序,但不一定按x排序。
  • CLUSTER BY x:这与执行(DISTRIBUTE BY xSORT BY x)相同。

详细机制和语法请参考官方wiki:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy
编辑:如评论和其他答案中所述,之前的答案是不正确的,词语的选择可能会增加错误的细微差别。短语overlapping-range的使用是误导性的,因为 Shuffle 是用散列函数完成的,而不一定是按范围。例如,根据实现方式,通过偶数或奇数对整数进行散列是有效的,这在数字范围方面是重叠的。在这种情况下,DISTRIBUTE BY只保证所有偶数进入同一个reducer,所有奇数进入另一个reducer。
其他答案有小错误,或与问题无关,或过于冗长。

vc9ivgsu

vc9ivgsu2#

让我先澄清一下:clustered by只将你的key分配到不同的bucket中,clustered by ... sorted by将bucket排序。
通过一个简单的实验(见下文),您可以看到默认情况下不会获得全局顺序。原因是默认的分区程序使用散列代码分割键,而不管实际的键顺序。
但是,你可以让你的数据完全有序。
动机是“Hadoop:《权威指南》(The Definitive Guide),汤姆·怀特(Tom White)著,第3版,第8章。274,Total Sort),在那里他讨论了TotalOrderPartitioner。
我将首先回答您的TotalOrdering问题,然后描述我所做的几个与排序相关的Hive实验。
请记住:我在这里描述的是一个“概念证明”,我能够使用Claudera的CDH 3发行版处理一个例子。
最初,我希望org.apache.hadoop.mapred.lib.TotalOrderPartitioner能够完成这个任务。不幸的是,它没有,因为它看起来像是Hive分区的值,而不是键。所以我修补它(应该有子类,但我没有时间):
更换

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
}

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(value);
}

现在你可以将TotalOrderPartitioner设置为你的Hive分区:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2

我还用了

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;

在我的测试中。
文件out_data2告诉TotalOrderPartitioner如何存储值。通过对数据进行采样生成out_data2。在我的测试中,我使用了4个桶和从0到10的键。我使用ad-hoc方法生成了out_data2:

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;

public class TotalPartitioner extends Configured implements Tool{
    public static void main(String[] args) throws Exception{
            ToolRunner.run(new TotalPartitioner(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]{1, 6}, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]{1, 9}, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    }

}

然后我将结果out_data2复制到HDFS(到/user/yevgen/out_data2)
通过这些设置,我对数据进行了分桶/排序(参见我的实验列表中的最后一项)。
这是我的实验。

  • 创建示例数据

bash> echo -e“1\n3\n2\n4\n5\n7\n6\n8\n9\n0”> data.txt

  • 创建基本测试表:

int x = int n(int x); hive>将数据本地inpath 'data.txt'加载到表test中;
基本上,该表包含从0到9的值,没有顺序。

  • 演示表复制是如何工作的(实际上是mapred.reduce.tasks参数,它设置要使用的reduce任务的最大数量)

hive> create table test2(x int);
hive> set mapred.reduce.tasks=4;
hive> insert overwrite table test2 select a.x from test a join test b on a.x=b.x;-- stupied join强制非平凡map-reduce
bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0
1
5
9

  • 演示桶化。你可以看到键是随机分配的,没有任何排序顺序:

hive> create table test3(x int)clustered by(x)into 4 bucket;
hive> set hive.enforce.bucketing = true;
hive> insert overwrite table test3 select * from test;
bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0
4
8
0

  • 带排序的分桶。结果是部分排序的,而不是完全排序的

hive> create table test4(x int)clustered by(x)sorted by(x desc)into 4 bucket;
hive> insert overwrite table test4 select * from test;
bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0
1
5
9
您可以看到值是按升序排序的。CDH 3中的Hivebug?

  • 在没有cluster by语句的情况下进行部分排序:

hive> create table test5 as select x from test distributed by x sort by x desc;
bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0
9
5
1

  • 使用我的修补程序TotalOrderParitioner:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
hive> set total.order.partitioner.natural.order=false
hive> set total.order.partitioner.path=/user/training/out_data2
hive> create table test6(x int)clustered by(x)sorted by(x)into 4 bucket;
hive> insert overwrite table test6 select * from test;
bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0
1
2
0
bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0
3
4
5
bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0
7
6
8
bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0
9

7gcisfzg

7gcisfzg3#

CLUSTER BY不生成全局排序。
公认的答案(由Lars Yencken提出)误导了我们,因为它说归约器将得到不重叠的范围。正如Anton Zaviriukhin正确指出的BucketedTables文档,CLUSTER BY基本上是DISTRIBUTE BY(与bucketing相同)加上每个bucket/reducer中的SORT BY。DISTRIBUTE BY只是将哈希和mod散列到桶中,而哈希函数may保持顺序(如果i > j,则i的哈希> j的哈希),而哈希值的mod不保持顺序。
这里有一个更好的例子显示重叠的范围
http://myitlearnings.com/bucketing-in-hive/

xmjla07d

xmjla07d4#

据我所知,简短的回答是否定的。你会得到重叠的范围。
来自SortBy文档:“Cluster By是Distribute By和Sort By的快捷方式。”“具有相同Distribute By列的所有行都将进入同一个reducer。”但是没有信息表明Distribute by保证不重叠的范围。
此外,从DDL BucketedTables documentation:“Hive如何在存储桶中分配行?一般来说,bucket数由表达式hash_function(bucketing_column)mod num_buckets确定。”我假设Select语句中的Cluster by使用相同的原则在reducer之间分配行,因为它的主要用途是用数据填充bucketed表。
我创建了一个包含1个整数列“a”的表,并在其中插入了0到9的数字。
然后我将reducers的数量设置为2 set mapred.reduce.tasks = 2;
和此表中带有Cluster by子句select * from my_tab cluster by a;select数据
并得到了预期的结果:

0
2
4
6
8
1
3
5
7
9

所以,第一个reducer(0号)得到偶数(因为他们的模式2给出0)
而第二个reducer(1号)得到奇数(因为它们的模式2给出1)
这就是“分发者”的工作原理。
然后“排序依据”对每个reducer中的结果进行排序。

yqkkidmi

yqkkidmi5#

用例:当有一个大的数据集时,应该使用sort by,因为在sort by中,所有的set reducer在聚集在一起之前对数据进行内部排序,这提高了性能。而在Order by中,较大数据集的性能会降低,因为所有数据都通过单个reducer传递,这会增加负载,因此需要更长的时间来执行查询。请参见下面的11节点群集示例。

这个是Order By example outputx 1c 1d 1x
这一个是Sort By示例输出

这是Cluster By example

据我观察,排序方式、聚类方式和分布方式的数据相同,但内部机制不同。在分销商中:相同的列行将转到一个reducer,例如。DISTRIBUTE BY(城市)-班加罗尔数据在一列中,德里数据在一个减速器中:

pxy2qtax

pxy2qtax6#

聚类依据是按缩减器排序,而不是全局排序。在许多书中,它也被错误地或令人困惑地提到。它有特殊的用途,比如说你把每个部门分配给特定的reducer,然后按每个部门的员工姓名排序,而不关心要使用的集群的部门顺序,当工作量在reducer之间分配时,它会更有性能。

nfeuvbwi

nfeuvbwi7#

SortBy:N个或多个具有重叠范围的排序文件。
OrderBy:单次输出,即完全排序。
分发人:通过保护N个reducer中的每一个来获得列的非重叠范围,但不对每个reducer的输出进行排序。
更多信息http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/
ClusterBy:参考上面的例子,如果我们使用Cluster By x,两个reducer将进一步对x上的行进行排序:

zbsbpyhn

zbsbpyhn8#

如果我没理解错的话

  1. sort by -只对reducer中的数据进行排序
  2. order by -通过将整个数据集推到单个reducer来全局排序。如果我们确实有大量数据(偏斜),这个过程将花费大量时间。
  3. cluster by -根据key hash智能地将数据分配到reducer中,并进行排序,但不授予全局排序。一个键(k1)可以放入两个减速器中。第一个reducer得到10 K K1数据,第二个可能得到1 K k1数据。

相关问题