Apache Hudi版本0.13.0 Spark版本3.3.2
我对Hudi和Minio非常陌生,一直在尝试以Hudi格式从本地数据库向Minio写入一个表。我正在使用覆盖保存模式进行上传。当第一次运行成功写入表时,脚本的任何进一步运行都会导致错误。我可以在相同的配置下使用append多次写入该表,但是使用overwrite在第一次之后抛出错误。
[error] org.apache.hudi.exception.HoodieIOException: Could not load Hoodie properties from s3a://hudi/status_device_view/test7/.hoodie/metadata/.hoodie/hoodie.properties
[error] at org.apache.hudi.common.table.HoodieTableConfig.<init>(HoodieTableConfig.java:289)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:138)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient.newMetaClient(HoodieTableMetaClient.java:689)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient.access$000(HoodieTableMetaClient.java:81)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:770)
[error] at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.enablePartitions(HoodieBackedTableMetadataWriter.java:202)
[error] at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:177)
[error] at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:104)
[error] at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:79)
[error] at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:341)
[error] at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:330)
[error] at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1133)
[error] at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1169)
[error] at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1198)
[error] at org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:162)
[warn] In the last 10 seconds, 6.778 (70.4%) were spent in GC. [Heap: 0.46GB free of 0.94GB, max 0.94GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance.
[error] at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:204)
[error] at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:363)
[error] at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
[error] at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
[error] at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
[error] at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
[error] at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
[error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
[error] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
[error] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
[error] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
[error] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
[error] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
[error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
[error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
[error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
[error] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
[error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
[error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
[error] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
[error] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
[error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
[error] at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
[error] at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
[error] at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
[error] at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
[error] at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
[error] at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
[error] at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
[error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
[error] at minio$.main(minio.scala:94)
[error] at minio.main(minio.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.io.FileNotFoundException: No such file or directory: s3a://hudi/status_device_view/test7/.hoodie/metadata/.hoodie/hoodie.properties.backup
[error] at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
[error] at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
[error] at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5401)
[error] at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1465)
[error] at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1441)
[error] at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
[error] at org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:476)
[error] at org.apache.hudi.common.table.HoodieTableConfig.fetchConfigs(HoodieTableConfig.java:343)
[error] at org.apache.hudi.common.table.HoodieTableConfig.<init>(HoodieTableConfig.java:270)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:138)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient.newMetaClient(HoodieTableMetaClient.java:689)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient.access$000(HoodieTableMetaClient.java:81)
[error] at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:770)
[error] at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.enablePartitions(HoodieBackedTableMetadataWriter.java:202)
[error] at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:177)
[error] at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:104)
[error] at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:79)
[error] at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:341)
[error] at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:330)
[error] at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1133)
[error] at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1169)
[error] at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1198)
[error] at org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:162)
[error] at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:204)
[error] at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:363)
[error] at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
[error] at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
[error] at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
[error] at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
[error] at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
[error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
[error] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
[error] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
[error] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
[error] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
[error] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
[error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
[error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
[error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
[error] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
[error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
[error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
[error] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
[error] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
[error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
[error] at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
[error] at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
[error] at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
[error] at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
[error] at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
[error] at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
[error] at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
[error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
[error] at minio$.main(minio.scala:94)
[error] at minio.main(minio.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] stack trace is suppressed; run last Compile / run for the full output
[error] (Compile / run) org.apache.hudi.exception.HoodieIOException: Could not load Hoodie properties from s3a://hudi/status_device_view/test7/.hoodie/metadata/.hoodie/hoodie.properties
用于编写该表的代码是
status_device_join.write.format("org.apache.hudi")
.options(Config)
.mode("overwrite")
.save("s3a://hudi/status_device_view/test7")
我使用的配置是
val Config = scala.collection.mutable.Map(
"className"->"org.apache.hudi",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.write.recordkey.field" -> "serial_number",
"hoodie.datasource.write.partitionpath.field" -> "",
"hoodie.datasource.write.precombine.field" -> "mac_address",
"hoodie.table.name" -> "status_device_join",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.NonPartitionedExtractor",
"hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
"hoodie.datasource.write.operation" -> "insert",
"hoodie.filesystem.view.remote.retry.enable" -> "true",
"hoodie.embed.timeline.server" -> "false",
"hoodie.metadata.enable" -> "true",
"hoodie.clustering.preserve.commit.metadata" -> "true",
"hoodie.datasource.write.table.type" -> "COPY_ON_WRITE"
)
这是我创建的Spark会话
val sc = SparkSession.builder().master("local").appName("minio").
config("spark.driver.extraClassPath", "/Users/Downloads/postgresql-42.6.0.jar").
config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
config("spark.sql.hive.convertMetastoreParquet", value = false).
config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog").
config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
config("spark.hadoop.spark.sql.legacy.parquet.nanosAsLong", value = false).
getOrCreate()
我希望旧表被新的写操作覆盖,但我得到了上面提到的错误。感谢任何帮助这个错误
1条答案
按热度按时间zlhcx6iw1#
在hudi中,用传入的 Dataframe 覆盖表的方法是保留
Savemode.Append
并设置hoodie.datasource.write.operation=insert_overwrite_table
。顺便说一下,你也可以只替换传入 Dataframe 中的分区,然后使用
insert_overwrite
操作。