它们之间的关系

2vuwiymt  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(445)

试图理解hive分区与spark分区之间的关系,最终导致了一个关于连接的问题。
我有两个外部Hive表;都由s3 bucket支持,并由 date ; 因此,在每个bucket中都有具有名称格式的键 date=<yyyy-MM-dd>/<filename> .
问题1:
如果我把这些数据读入spark:

val table1 = spark.table("table1").as[Table1Row]
val table2 = spark.table("table2").as[Table2Row]

那么结果数据集将分别有多少个分区?分区等于s3中的对象数?
问题2:
假设这两个行类型具有以下架构:

Table1Row(date: Date, id: String, ...)
Table2Row(date: Date, id: String, ...)

我想加入 table1 以及 table2 在田野上 date 以及 id :

table1.joinWith(table2,
  table1("date") === table2("date") && 
    table1("id") === table2("id")
)

spark是否能够利用这样一个事实,即连接的字段之一是配置单元表中的分区键来优化连接?如果是的话怎么办?
问题3:
假设现在我正在使用 RDD 而是:

val rdd1 = table1.rdd
val rdd2 = table2.rdd

afaik,使用 RDD api看起来像:

rdd1.map(row1 => ((row1.date, row1.id), row1))
  .join(rdd2.map(row2 => ((row2.date, row2.id), row2))))

同样,spark是否能够利用hive表中的分区键正在join中使用的事实?

von4xj4u

von4xj4u1#

一般来说,
spark分区—大型分布式数据集的(逻辑)块。spark为单个分区生成单个任务,该分区将在executor jvm中运行。
配置单元分区是一种通过基于分区键(列)将表划分为不同部分来将表组织为分区的方法。分区使访问数据更简单、更清晰。
可以调整的配置很少- spark.sql.files.maxPartitionBytes -读取文件时要打包到单个分区的最大字节数(默认128mb) spark.sql.files.openCostInBytes -打开一个文件的估计成本,以可以同时扫描的字节数来衡量。将多个文件放入一个分区时使用。最好是高估,这样小文件的分区会比大文件的分区(先调度)快(默认值(4 mb) spark.sql.shuffle.partitions -配置为联接或聚合洗牌数据时要使用的分区数(默认为200)

czfnxgou

czfnxgou2#

那么结果数据集将分别有多少个分区?分区等于s3中的对象数?
无法回答你提供的信息。最新版本中的分区数主要取决于 spark.sql.files.maxPartitionByte ,尽管其他因素也能起到一定的作用。
spark是否能够利用这样一个事实,即连接的字段之一是配置单元表中的分区键来优化连接?
现在还没有(spark 2.3.0),但是spark可以使用bucketing( DISTRIBUTE BY )优化联接。请参见如何定义Dataframe的分区?。一旦数据源APIv2稳定下来,这种情况在将来可能会改变。
假设现在我再次使用rdds(…),spark是否能够利用hive表中的分区键在join中使用的事实?
一点也不。即使数据是绑定的rdd转换和函数 Dataset 变换是黑盒。无法应用任何优化,请在此应用。

相关问题