pyspark delta-lake-optimize无法解析sql

ifmq2ha2  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(498)

我使用spark 3.x和delta 0.7.x创建了一个delta表:

  1. data = spark.range(0, 5)
  2. data.write.format("delta").mode("overwrite").save("tmp/delta-table")
  3. # add some more files
  4. data = spark.range(20, 100)
  5. data.write.format("delta").mode("append").save("tmp/delta-table")
  6. df = spark.read.format("delta").load("tmp/delta-table")
  7. df.show()

现在在日志中生成了相当多的文件(很多情况下,Parquet文件太小)。

  1. %ls tmp/delta-table

我想压缩它们:

  1. df.createGlobalTempView("my_delta_table")
  2. spark.sql("OPTIMIZE my_delta_table ZORDER BY (id)")

失败原因:

  1. ParseException:
  2. mismatched input 'OPTIMIZE' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)
  3. == SQL ==
  4. OPTIMIZE my_delta_table ZORDER BY (id)
  5. ^^^

问题:
我怎样才能让它工作(优化)而不失败的查询
有没有比调用基于文本的sql更原生的api?
注意:

  1. spark is started like this:
  2. import pyspark
  3. from pyspark.sql import SparkSession
  4. spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
  5. .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
  6. .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  7. .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  8. .getOrCreate()
  9. from delta.tables import *
zbsbpyhn

zbsbpyhn1#

OPTIMIZE 在oss delta lake中不可用。如果要压缩文件,可以按照“压缩文件”部分中的说明进行操作。如果你想用 ZORDER ,当前需要使用databricks运行时。

5lwkijsr

5lwkijsr2#

如果您在本地运行delta,这意味着您必须使用oss delta-lake。“优化”命令仅适用于databricks delta lake。要在oss中进行文件压缩,可以执行以下操作-https://docs.delta.io/latest/best-practices.html#compact-文件

相关问题