我在使用flink将数据插入Apachi hudi表时遇到错误

hrirmatl  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(438)

使用环境:
Flink :1.15.2
胡迪flink:胡迪-flink1.15-捆绑包-0.12.0.jar
执行语句时:

Flink SQL> CREATE TABLE t1(
>   uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
>   name VARCHAR(10),
>   age INT,
>   ts TIMESTAMP(3),
>   `partition` VARCHAR(20)
> )
> PARTITIONED BY (`partition`)
> WITH (
>   'connector' = 'hudi',
>   'path' = 's3a://flink-hudi/t1',
>   'table.type' = 'MERGE_ON_READ'
> );
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 77f98b6de7db94a52206b2a4f7961141

hudi表t1被创建,但仅包含元数据directory.hoodie。

    • 出现错误:**

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.hudi.exception.HoodieException: Exception while scanning the checkpoint meta files under path: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta at org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:169) at org.apache.hudi.sink.meta.CkpMetadata.lastPendingInstant(CkpMetadata.java:175) at org.apache.hudi.sink.common.AbstractStreamWriteFunction.lastPendingInstant(AbstractStreamWriteFunction.java:243) at org.apache.hudi.sink.common.AbstractStreamWriteFunction.initializeState(AbstractStreamWriteFunction.java:151) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: No such file or directory: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2344) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2226) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2160) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1961) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1940) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1940) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$15(HoodieWrapperFileSystem.java:365) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:364) at org.apache.hudi.sink.meta.CkpMetadata.scanCkpMetadata(CkpMetadata.java:216) at org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:167) ... 18 more

预期将数据成功插入hudi表。

ecbunoof

ecbunoof1#

看起来Flink作业正在尝试从状态恢复,但是Hudi遇到了由No such file or directory: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta引起的错误。由于您在启动时确认存储桶是干净的,所以我能提供的最佳建议是使用s3p:///文件系统,迫使Flink使用Presto实现(与Hadoop版本相比)。请参阅以下评论,了解为什么使用Hadoop文件系统会导致“文件未找到”的情况。

相关问题