flink保存点被拒绝

evrscar2  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(373)

我正在尝试在一个已经实现了定制的可并行化套接字源代码的作业上使用save point。来源与此类似

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
    int idx = getRuntimeContext().getIndexOfThisSubtask();
    String[] hosts = (config.hostsStr).split(":");
    String[] portStrArr = (config.portsStr).split(":");
    int[] ports = new int[portStrArr.length];
    for (int i = 0; i < portStrArr.length; i++) {
        ports[i] = Integer.parseInt(portStrArr[i]);
    }
    Socket s = new Socket(hosts[idx], ports[idx]);
    BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream()));
    //ois = new ObjectInputStream(s.getInputStream());
    while (running) {
        String str  = in.readLine();
        sourceContext.collect(str);
    }
    sourceContext.close();
}
@Override
public void cancel() {
    running = false;
}

集群上的异常如下所示

flink-1.1.3/bin//flink cancel -s hdfs://flink-master:19000/flink-checkpoints a18499a80099045eb5120ecacdabd421
Retrieving JobManager.
Using address flink-master/10.0.0.16:6123 to connect to JobManager.
Cancelling job a18499a80099045eb5120ecacdabd421 with savepoint to hdfs://flink-master:19000/flink-checkpoints.

java.lang.Exception: Canceling the job with ID a18499a80099045eb5120ecacdabd421 failed.
    at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:637)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1092)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: java.lang.Exception: Failed to trigger savepoint.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:639)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:629)
    at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
    at akka.dispatch.OnComplete.internal(Future.scala:247)
    at akka.dispatch.OnComplete.internal(Future.scala:245)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.EOFException: Premature EOF: no length prefix available
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
Suppressed: java.lang.IllegalArgumentException: Self-suppression not permitted
    at java.lang.Throwable.addSuppressed(Throwable.java:1043)
    at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:207)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:150)
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:281)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:888)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:813)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[CIRCULAR REFERENCE:java.io.EOFException: Premature EOF: no length prefix available]

在我的本地计算机上,保存点被以下异常拒绝:

Cancelling job 4c99e0220c8c4683d1287269073b5c2c with savepoint to savepoints/.
java.lang.Exception: Canceling the job with ID 4c99e0220c8c4683d1287269073b5c2c failed.
    at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:637)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1092)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: java.lang.Exception: Failed to trigger savepoint.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:639)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:629)
    at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
    at akka.dispatch.OnComplete.internal(Future.scala:247)
    at akka.dispatch.OnComplete.internal(Future.scala:245)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Checkpoint was declined (tasks not ready)
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortDeclined(PendingCheckpoint.java:510)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:735)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply$mcV$sp(JobManager.scala:1491)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply(JobManager.scala:1490)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply(JobManager.scala:1490)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    ... 6 more

是不是因为我的源代码无法正确停止,所以检查点不会发生?在集群上,它确实表示它成功并将位置返回到保存点,但该路径上没有文件。

mgdq6dx1

mgdq6dx11#

考虑到源函数摘录,我觉得它几乎不错。您应该做的是在检查点锁下输出元素。否则,在触发检查点的同时输出元素时,可能会遇到问题。这个 SourceContext#getCheckpointLock 确保这两个操作不会同时发生。
第一个错误看起来有点像hdfs方面的问题。你能检查一下日志里有没有可疑的东西吗?可能数据节点的磁盘空间用完了。
第二个异常表示在执行检查点时出错。jobmanager日志应该包含一个log语句,说明检查点失败的原因。它的格式应该是:discarding checkpoint checkpoint\u id because of checkpoint declient from task execution\u id:reason。

相关问题