我需要使用sparksql从配置单元表加载数据 HiveContext
并加载到hdfs中。默认情况下 DataFrame
从sql输出有2个分区。为了获得更多的并行性,我需要更多的sql分区。中没有重载方法 HiveContex
t取分区数参数。
rdd的重新分区会导致洗牌并导致更多的处理时间。
val result = sqlContext.sql("select * from bt_st_ent")
日志输出为:
Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes)
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)
我想知道有没有办法增加sql输出的分区大小。
3条答案
按热度按时间r8xiu3jd1#
Spark<2.0:
您可以使用hadoop配置选项:
mapred.min.split.size
.mapred.max.split.size
以及hdfs块大小来控制基于文件系统格式的分区大小*。spark 2.0+:
你可以用
spark.sql.files.maxPartitionBytes
配置:在这两种情况下,这些值可能不会被特定的数据源api使用,因此您应该始终检查所使用格式的文档/实现详细信息。
通过jdbc读取rdbms时spark中的分区
mapreduce split和spark parition的区别
此外
Datasets
创建自RDDs
将从其父级继承分区布局。类似地,bucked表将使用metastore中定义的bucket布局,bucket和bucket之间的关系为1:1
Dataset
分区。nxagd54h2#
如果您的sql执行shuffle(例如它有一个join或某种groupby),您可以通过设置'spark.sql.shuffle.partitions'属性来设置分区数
按照fokko的建议,你可以用一个随机变量来聚类。
rhfm7lfc3#
一个非常普遍和痛苦的问题。您应该寻找在统一分区中分发数据的密钥。你可以用
DISTRIBUTE BY
以及CLUSTER BY
运算符来告诉spark将分区中的行分组。这将导致查询本身的一些开销。但会导致大小均匀的分区。deepsense对此有一个非常好的教程。