我用spark 2.3.1启动spark shell,参数如下: --master='local[*]'
--executor-memory=6400M --driver-memory=60G
--conf spark.sql.autoBroadcastJoinThreshold=209715200 --conf spark.sql.shuffle.partitions=1000
--conf spark.local.dir=/data/spark-temp --conf spark.driver.extraJavaOptions='-Dderby.system.home=/data/spark-catalog/'
然后用sort和bucket创建两个配置单元表
第一个表名-表1
第二个表名-表2
val storagePath = "path_to_orc"
val storage = spark.read.orc(storagePath)
val tableName = "table1"
sql(s"DROP TABLE IF EXISTS $tableName")
storage.select($"group", $"id").write.bucketBy(bucketsCount, "id").sortBy("id").saveAsTable(tableName)
(表2代码相同)
我希望当我用另一个df连接这些表时,在查询计划中没有不必要的交换步骤
然后我关掉广播使用sortmergejoin
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
我吃点东西
val sample = spark.read.option("header", "true).option("delimiter", "\t").csv("path_to_tsv")
val m = spark.table("table1")
sample.select($"col" as "id").join(m, Seq("id")).explain()
== Physical Plan ==
* (4) Project [id#24, group#0]
+- *(4) SortMergeJoin [id#24], [id#1], Inner
:- *(2) Sort [id#24 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#24, 1000)
: +- *(1) Project [col#21 AS id#24]
: +- *(1) Filter isnotnull(col#21)
: +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
+- *(3) Project [group#0, id#1]
+- *(3) Filter isnotnull(id#1)
+- *(3) FileScan parquet default.table1[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
但当我在连接前对两个表使用并集时
val m2 = spark.table("table2")
val mUnion = m union m2
sample.select($"col" as "id").join(mUnion, Seq("id")).explain()
== Physical Plan ==
* (6) Project [id#33, group#0]
+- *(6) SortMergeJoin [id#33], [id#1], Inner
:- *(2) Sort [id#33 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#33, 1000)
: +- *(1) Project [col#21 AS id#33]
: +- *(1) Filter isnotnull(col#21)
: +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
+- *(5) Sort [id#1 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#1, 1000)
+- Union
:- *(3) Project [group#0, id#1]
: +- *(3) Filter isnotnull(id#1)
: +- *(3) FileScan parquet default.membership_g043_append[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
+- *(4) Project [group#4, id#5]
+- *(4) Filter isnotnull(id#5)
+- *(4) FileScan parquet default.membership_g042[group#4,id#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
在本例中,出现了排序和分区(步骤5)
如何合并两个配置单元表而不进行排序和交换
1条答案
按热度按时间rlcwz9us1#
据我所知,spark在连接时不考虑排序,而只考虑分区。因此,为了获得有效的联接,必须按同一列进行分区。这是因为排序不能保证具有相同密钥的记录最终位于同一分区中。spark必须确保所有具有相同值的键从多个dataframes被洗牌到同一分区和同一执行器上。