azure blob存储与apache flink 1.10

bqujaahr  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(486)

我正在尝试使用带有ApacheFlink1.10的azure blob存储进行检查点。
我遵循了[flink文档][1]中提到的所有说明https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/azure.html

这是我在flink-conf.xml中看到的 #Azure Storage Key**fs.azure.account.key.<storage-account>.blob.core.windows.net:** ##第三步:
使用azure blob存储进行检查点设置
这就是我在Flink的工作 final StateBackend stateBackend = new FsStateBackend("wasb://flink-blob@$<storage-account>.blob.core.windows.net/checkpoint"); 我不确定我是否错过了什么,但当我提交的工作,我得到以下例外(asktimeoutexception从演员)。

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
    at com.example.flink.checkpointing.CheckpointExample.main(CheckpointExample.java:78)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736)
    ... 17 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#75666936]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
    at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
    at java.base/java.lang.Thread.run(Thread.java:834)

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
    ... 4 more```
z9ju0rcb

z9ju0rcb1#

我认为你的问题是双重的。真正的失败原因隐藏在 AskTimeoutException . 这个问题已经用flink-16018解决了,它将与flink一起发布 1.10.1 . 问题是超时值太过激进,因此持久的作业提交将在客户端失败。
至于真正的失败原因,我建议你看看Flink的 jobmanager.log . 它应该包含出错的信息。我怀疑azure blob存储的配置有误。

相关问题