我使用Dataroc Metastore,Dataproc Batch和Pyspark。虽然我使用GCP,我相信这是一般的Apache冰山问题。
我运行我的Spark作业,并创建了冰山旅行表与自动快照过期在1小时history.expire.max-snapshot-age-ms=3600000
和写从CSV文件的内容到表。
conf = (
SparkConf()
.setAppName('read_from_iceberg')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
.set('spark.sql.catalog.spark_catalog.type', 'hive')
.set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
.set(f'spark.sql.catalog.dev.type', 'hive')
.set(f'spark.sql.warehouse.dir', lake_bucket)
)
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
schema = StructType([
StructField("vendor_id", LongType(), True),
StructField("trip_id", LongType(), True),
StructField("trip_distance", FloatType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True)
])
# Create database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS dev.lakehouse")
# Create table if doesn't exist.
# df = spark.createDataFrame([], schema)
df.writeTo("dev.lakehouse.trips4").partitionedBy("vendor_id")
.tableProperty('format-version', '2')
.tableProperty("history.expire.max-snapshot-age-ms","3600000").createOrReplace())
df3 = spark.read.option("delimiter", ",").schema(schema).option("header", True).csv(
"gs://my-super-bucket/csv-input/bulk/*")
df3.write.mode('append').format("iceberg").insertInto("dev.lakehouse.trips")
字符串
我重复了几次批处理执行,结果我有4500万个写对象。
+---------+
| count(1)|
+---------+
|450000000|
+---------+
型
现在我想看看table的历史。
spark.sql("SELECT * FROM dev.lakehouse.trips4.history").show()
型
结果如下:
+--------------------+-------------------+-------------------+-------------------+
| made_current_at| snapshot_id| parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2023-11-08 09:05:...|3365635318905728444| null| true|
|2023-11-08 09:07:...|8818173850344394660|3365635318905728444| true|
|2023-11-08 09:18:...|7080281147456503211|8818173850344394660| true|
|2023-11-08 09:26:...|1124704647664806615|7080281147456503211| true|
|2023-11-08 09:43:...|1410379929547885531|1124704647664806615| true|
|2023-11-08 09:44:...|2828602979849095888|1410379929547885531| true|
|2023-11-08 11:59:...|3836167930220261494|2828602979849095888| true|
|2023-11-08 12:09:...|7872321137982208330|3836167930220261494| true|
+--------------------+-------------------+-------------------+-------------------+
型
虽然过期时间设置为一小时,但我仍然看到所有其他应该删除的快照。
我知道我总是可以用
spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)")
型
这将删除旧的时间戳,但它不应该是自动完成的吗?
1条答案
按热度按时间tquggr8v1#
过期的Iceberg表快照不会自动删除。
快照会不断累积,直到它们被
expireSnapshots
操作过期。建议定期过期快照以删除不再需要的数据文件。配置
history.expire.max-snapshot-age-ms
是快照到期时保留在表及其所有分支上的快照的默认最大期限。
您显示的代码
spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)")
是Spark SQL extension for expiring snapshots。