试图理解hive分区与spark分区之间的关系,最终导致了一个关于连接的问题。
我有两个外部Hive表;都由s3 bucket支持,并由 date
; 因此,在每个bucket中都有具有名称格式的键 date=<yyyy-MM-dd>/<filename>
.
问题1:
如果我把这些数据读入spark:
val table1 = spark.table("table1").as[Table1Row]
val table2 = spark.table("table2").as[Table2Row]
那么结果数据集将分别有多少个分区?分区等于s3中的对象数?
问题2:
假设这两个行类型具有以下架构:
Table1Row(date: Date, id: String, ...)
Table2Row(date: Date, id: String, ...)
我想加入 table1
以及 table2
在田野上 date
以及 id
:
table1.joinWith(table2,
table1("date") === table2("date") &&
table1("id") === table2("id")
)
spark是否能够利用这样一个事实,即连接的字段之一是配置单元表中的分区键来优化连接?如果是的话怎么办?
问题3:
假设现在我正在使用 RDD
而是:
val rdd1 = table1.rdd
val rdd2 = table2.rdd
afaik,使用 RDD
api看起来像:
rdd1.map(row1 => ((row1.date, row1.id), row1))
.join(rdd2.map(row2 => ((row2.date, row2.id), row2))))
同样,spark是否能够利用hive表中的分区键正在join中使用的事实?
2条答案
按热度按时间von4xj4u1#
一般来说,
spark分区—大型分布式数据集的(逻辑)块。spark为单个分区生成单个任务,该分区将在executor jvm中运行。
配置单元分区是一种通过基于分区键(列)将表划分为不同部分来将表组织为分区的方法。分区使访问数据更简单、更清晰。
可以调整的配置很少-
spark.sql.files.maxPartitionBytes
-读取文件时要打包到单个分区的最大字节数(默认128mb)spark.sql.files.openCostInBytes
-打开一个文件的估计成本,以可以同时扫描的字节数来衡量。将多个文件放入一个分区时使用。最好是高估,这样小文件的分区会比大文件的分区(先调度)快(默认值(4 mb)spark.sql.shuffle.partitions
-配置为联接或聚合洗牌数据时要使用的分区数(默认为200)czfnxgou2#
那么结果数据集将分别有多少个分区?分区等于s3中的对象数?
无法回答你提供的信息。最新版本中的分区数主要取决于
spark.sql.files.maxPartitionByte
,尽管其他因素也能起到一定的作用。spark是否能够利用这样一个事实,即连接的字段之一是配置单元表中的分区键来优化连接?
现在还没有(spark 2.3.0),但是spark可以使用bucketing(
DISTRIBUTE BY
)优化联接。请参见如何定义Dataframe的分区?。一旦数据源APIv2稳定下来,这种情况在将来可能会改变。假设现在我再次使用rdds(…),spark是否能够利用hive表中的分区键在join中使用的事实?
一点也不。即使数据是绑定的rdd转换和函数
Dataset
变换是黑盒。无法应用任何优化,请在此应用。