scala Spark Sql bucketing优化似乎没有生效

3duebb1j  于 2024-01-08  发布在  Scala
关注(0)|答案(1)|浏览(256)

我正在使用spark 3.x,我有以下简单的查询来学习spark SQL bucketing特性。

test("bucket join 1") {
    val spark = SparkSession.builder().master("local").enableHiveSupport().appName("test join 1").config("spark.sql.codegen.wholeStage", "false").getOrCreate()
    import spark.implicits._
    val data1 = (0 to 100).map {
      i => (i, ('A' + i % 6).asInstanceOf[Char].toString)
    }

    val t1 = "t_" + System.currentTimeMillis()
    data1.toDF("a", "b").write.bucketBy(2, "b").saveAsTable(t1)

    val data2 = (0 to 5).map {
      i => (('A' + i % 6).asInstanceOf[Char].toString, ('A' + i % 6).asInstanceOf[Char].toString)
    }

    val t2 = "t_" + System.currentTimeMillis()
    data2.toDF("a", "b").write.bucketBy(2, "b").saveAsTable(t2)

    val df = spark.sql(
      s"""

      select  t1.a ,t1.b,t2.a, t2.b from $t1 t1 join $t2 t2 on t1.b = t2.b

      """.stripMargin(' '))

    df.explain(true)

    df.show(truncate = false)

    spark.sql(s"describe extended $t1 ").show(truncate = false)
    spark.sql(s"describe extended $t2 ").show(truncate = false)

    
  }

字符串
当我运行上面的代码时,它打印出以下物理计划,它显然涉及到 Shuffle ,所以我认为桶在这里不起作用。
describe command的输出中,我看到两个表都定义了bucket spec:

Bucket Columns: b
Num Buckets: 2


不知道我错过了什么,我没有预期的结果( Shuffle 应避免)

BroadcastHashJoin [b#27], [b#29], Inner, BuildRight
:- Project [a#26, b#27]
:  +- Filter isnotnull(b#27)
:     +- FileScan parquet default.t_1700986792232[a#26,b#27] Batched: false, DataFilters: [isnotnull(b#27)], Format: Parquet, Location: InMemoryFileIndex[file:/D://spark-warehouse/t_17009867..., PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:int,b:string>, SelectedBucketsCount: 2 out of 2
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])), [id=#35]
   +- Project [a#28, b#29]
      +- Filter isnotnull(b#29)
         +- FileScan parquet default.t_1700986813106[a#28,b#29] Batched: false, DataFilters: [isnotnull(b#29)], Format: Parquet, Location: InMemoryFileIndex[file:/D:/spark-warehouse/t_17009868..., PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:string,b:string>, SelectedBucketsCount: 2 out of 2

41zrol4v

41zrol4v1#

你对DF的数字导致Catalyst认为广播哈希连接是更好的方法。这是一种算法。
1.尝试以下spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1),使用Bucketing。
1.对于bothDF's,尝试使用1M行,使用Bucketing。
正如你在下面看到的。

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [b#573], [b#575], Inner
   :- Sort [b#573 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(b#573)
   :     +- FileScan parquet spark_catalog.default.t_1701024653715[a#572,b#573] Batched: false, Bucketed: true, DataFilters: [isnotnull(b#573)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/user/hive/warehouse/t_1701024653715], PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:int,b:string>, SelectedBucketsCount: 2 out of 2
   +- Sort [b#575 ASC NULLS FIRST], false, 0
      +- Filter isnotnull(b#575)
         +- FileScan parquet spark_catalog.default.t_1701024668124[a#574,b#575] Batched: false, Bucketed: true, DataFilters: [isnotnull(b#575)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/user/hive/warehouse/t_1701024668124], PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:string,b:string>, SelectedBucketsCount: 2 out of 2

字符串
对于delta和ZOrder,您可能希望考虑delta格式。

相关问题