hive 冰山表快照未过期

xiozqbni  于 2023-11-18  发布在  Hive
关注(0)|答案(1)|浏览(158)

我使用Dataroc Metastore,Dataproc Batch和Pyspark。虽然我使用GCP,我相信这是一般的Apache冰山问题。
我运行我的Spark作业,并创建了冰山旅行表与自动快照过期在1小时history.expire.max-snapshot-age-ms=3600000和写从CSV文件的内容到表。

  1. conf = (
  2. SparkConf()
  3. .setAppName('read_from_iceberg')
  4. .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  5. .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
  6. .set('spark.sql.catalog.spark_catalog.type', 'hive')
  7. .set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
  8. .set(f'spark.sql.catalog.dev.type', 'hive')
  9. .set(f'spark.sql.warehouse.dir', lake_bucket)
  10. )
  11. spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
  12. schema = StructType([
  13. StructField("vendor_id", LongType(), True),
  14. StructField("trip_id", LongType(), True),
  15. StructField("trip_distance", FloatType(), True),
  16. StructField("fare_amount", DoubleType(), True),
  17. StructField("store_and_fwd_flag", StringType(), True)
  18. ])
  19. # Create database if not exists
  20. spark.sql("CREATE DATABASE IF NOT EXISTS dev.lakehouse")
  21. # Create table if doesn't exist.
  22. # df = spark.createDataFrame([], schema)
  23. df.writeTo("dev.lakehouse.trips4").partitionedBy("vendor_id")
  24. .tableProperty('format-version', '2')
  25. .tableProperty("history.expire.max-snapshot-age-ms","3600000").createOrReplace())
  26. df3 = spark.read.option("delimiter", ",").schema(schema).option("header", True).csv(
  27. "gs://my-super-bucket/csv-input/bulk/*")
  28. df3.write.mode('append').format("iceberg").insertInto("dev.lakehouse.trips")

字符串
我重复了几次批处理执行,结果我有4500万个写对象。

  1. +---------+
  2. | count(1)|
  3. +---------+
  4. |450000000|
  5. +---------+


现在我想看看table的历史。

  1. spark.sql("SELECT * FROM dev.lakehouse.trips4.history").show()


结果如下:

  1. +--------------------+-------------------+-------------------+-------------------+
  2. | made_current_at| snapshot_id| parent_id|is_current_ancestor|
  3. +--------------------+-------------------+-------------------+-------------------+
  4. |2023-11-08 09:05:...|3365635318905728444| null| true|
  5. |2023-11-08 09:07:...|8818173850344394660|3365635318905728444| true|
  6. |2023-11-08 09:18:...|7080281147456503211|8818173850344394660| true|
  7. |2023-11-08 09:26:...|1124704647664806615|7080281147456503211| true|
  8. |2023-11-08 09:43:...|1410379929547885531|1124704647664806615| true|
  9. |2023-11-08 09:44:...|2828602979849095888|1410379929547885531| true|
  10. |2023-11-08 11:59:...|3836167930220261494|2828602979849095888| true|
  11. |2023-11-08 12:09:...|7872321137982208330|3836167930220261494| true|
  12. +--------------------+-------------------+-------------------+-------------------+


虽然过期时间设置为一小时,但我仍然看到所有其他应该删除的快照。
我知道我总是可以用

  1. spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)")


这将删除旧的时间戳,但它不应该是自动完成的吗?

tquggr8v

tquggr8v1#

过期的Iceberg表快照不会自动删除。
快照会不断累积,直到它们被expireSnapshots操作过期。建议定期过期快照以删除不再需要的数据文件。
配置history.expire.max-snapshot-age-ms
快照到期时保留在表及其所有分支上的快照的默认最大期限。
您显示的代码spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)")Spark SQL extension for expiring snapshots

相关问题