又是数据处理菜鸟。
我的主要目标是从内部资源获取表,将它们作为Parquet-file存储在云存储桶中,并从该文件在BigQuery中创建/更新表。在我之前关于Dataproc和Hudi conf的文章之后,我能够通过Dataproc/PySpark/Hudi从内部资源部署和获取表,并将它们存储在云存储中。
下一个问题是关于'upsert'配置文件在'hudi_options'和如何可以追加新的结果在parquet文件是云存储桶。这是不清楚我是你可以更新/改变一个parquet文件与hudi。我想避免删除以前的加载和只存储一个parquet文件每个表。
上插代码:
table_location = "gs://bucket/{}/".format(table_name)
updates = spark.read.format("jdbc") \
.option("url",url) \
.option("user", username) \
.option("password", password) \
.option("driver", "com.sap.db.jdbc.Driver") \
.option("query", query) \
.load()
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'a,b,c,d,e',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'x',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
'path': table_location,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.mode': 'hms'
}
updates.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save()
但是,在云存储中生成另一个Parquet文件。
我在配置中遗漏了什么吗?
谢谢!
1条答案
按热度按时间sshcrbum1#
Apache Hudi和其他开放数据Lakehouse格式(ApacheIceberg和Delta)创建数据快照,以便将ACID事务转换为parquet格式。在每次提交中,它们读取更改分区中的现有数据,将更改应用到内存和tmp文件中(添加新数据、删除或更新现有数据),然后将整个数据重写到分区中。稍后,它们会根据清理策略删除旧文件,您可以使用它们进行时间旅行查询或回滚。
我希望避免删除以前的加载
虽然您的
OPERATION_OPT_KEY
不是insert_overwrite_table
或insert_overwrite
,Spark保存模式不是overwrite
,但不必担心以前的负载。并且每个表仅存储一个Parquet-file
如果您想压缩数据并在表中有一个大的 parquet 文件,则需要增加
hoodie.parquet.max.file.size
,默认情况下为120 MB,因为Hudi会将超过此大小的 parquet 文件拆分为块。