不支持通过群集创建sparksql表

ux6nzvsh  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(314)

根据spark文件https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#supported-配置单元功能支持群集的配置单元语句。但是当我尝试使用beeline中的以下查询创建表时

CREATE TABLE set_bucketing_test (key INT, value STRING) CLUSTERED BY (key) INTO 10 BUCKETS;

我得到以下错误

Error: org.apache.spark.sql.catalyst.parser.ParseException:
Operation not allowed: CREATE TABLE ... CLUSTERED BY(line 1, pos 0)

不知道我犯了什么错。有什么帮助吗?

bvuwiixz

bvuwiixz1#

您可以利用sparksql中的cluster-by特性来创建表、连接表等,这在spark2.1+
看到了吗https://issues.apache.org/jira/browse/spark-15453
而且当前配置单元无法识别此功能,因为spark和配置单元之间的元数据不兼容,这就是为什么即使在配置单元端识别此表,也不能使用相同的语法,因为它会将所有列视为 array 以下示例可能会给您一些想法:

准备来源 val df = (0 until 80000).map(i => (i, i.toString, i.toString)).toDF("item_id", "country", "state").coalesce(1) ####从源创建两个bucket表

您将看到“which is not compatible with hive.”滚动到右侧

df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test")

17/03/13 15:12:01 WARN HiveExternalCatalog: Persisting bucketed data source table `kofeng`.`lstg_bucket_test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test2")

加入他们并解释

由于音量较小,请先禁用广播连接。

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.sql.autoBroadcastJoinThreshold", "0").getOrCreate()

该方案在spark2.1.0中避免交换和排序,在spark2.0中避免交换,只有过滤和扫描才能证明数据的局部性利用。

val query = """
 |SELECT *
 |FROM
 |  kofeng.lstg_bucket_test a
 |JOIN
 |  kofeng.lstg_bucket_test2 b
 |ON a.country=b.country AND
 |   a.state=b.state
      """.stripMargin
val joinDF = sql(query)

    scala> joinDF.queryExecution.executedPlan
    res10: org.apache.spark.sql.execution.SparkPlan =

* SortMergeJoin [country#71, state#72], [country#74, state#75], Inner

:- *Project [item_id#70, country#71, state#72]
:  +- *Filter (isnotnull(country#71) && isnotnull(state#72))
:     +- *FileScan parquet kofeng.lstg_bucket_test[item_id#70,country#71,state#72] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ares-lvs-nn-ha/user/hive/warehouse/kofeng.db/lstg_bucket_test], PartitionFilters: [], PushedFilters: [IsNotNull(country), IsNotNull(state)], ReadSchema: struct<item_id:int,country:int,state:string>
+- *Project [item_id#73, country#74, state#75]
   +- *Filter (isnotnull(country#74) && isnotnull(state#75))
      +- *FileScan parquet kofeng.lstg_bucket_test2[item_id#73,country#74,state#75] Batched: true, Format: Parquet...

相关问题