如何在sparksql中控制分区大小

t3psigkw  于 2021-06-29  发布在  Hive
关注(0)|答案(3)|浏览(757)

我需要使用sparksql从配置单元表加载数据 HiveContext 并加载到hdfs中。默认情况下 DataFrame 从sql输出有2个分区。为了获得更多的并行性,我需要更多的sql分区。中没有重载方法 HiveContex t取分区数参数。
rdd的重新分区会导致洗牌并导致更多的处理时间。

val result = sqlContext.sql("select * from bt_st_ent")

日志输出为:

Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes)
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)

我想知道有没有办法增加sql输出的分区大小。

r8xiu3jd

r8xiu3jd1#

Spark<2.0:
您可以使用hadoop配置选项: mapred.min.split.size . mapred.max.split.size 以及hdfs块大小来控制基于文件系统格式的分区大小*。

val minSplit: Int = ???
val maxSplit: Int = ???

sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit)
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit)

spark 2.0+:
你可以用 spark.sql.files.maxPartitionBytes 配置:

spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)

在这两种情况下,这些值可能不会被特定的数据源api使用,因此您应该始终检查所使用格式的文档/实现详细信息。

  • 其他输入格式可以使用不同的设置。参见示例

通过jdbc读取rdbms时spark中的分区
mapreduce split和spark parition的区别
此外 Datasets 创建自 RDDs 将从其父级继承分区布局。
类似地,bucked表将使用metastore中定义的bucket布局,bucket和bucket之间的关系为1:1 Dataset 分区。

nxagd54h

nxagd54h2#

如果您的sql执行shuffle(例如它有一个join或某种groupby),您可以通过设置'spark.sql.shuffle.partitions'属性来设置分区数

sqlContext.setConf( "spark.sql.shuffle.partitions", 64)

按照fokko的建议,你可以用一个随机变量来聚类。

val result = sqlContext.sql("""
   select * from (
     select *,random(64) as rand_part from bt_st_ent
   ) cluster by rand_part""")
rhfm7lfc

rhfm7lfc3#

一个非常普遍和痛苦的问题。您应该寻找在统一分区中分发数据的密钥。你可以用 DISTRIBUTE BY 以及 CLUSTER BY 运算符来告诉spark将分区中的行分组。这将导致查询本身的一些开销。但会导致大小均匀的分区。deepsense对此有一个非常好的教程。

相关问题