有没有办法确保所有checkpointlisteners都在flink on job cancel with savepoint上收到关于完成检查点的通知?

bsxbgnwa  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(463)

我使用的是flink1.9和restapi /jobs/:jobid/savepoints 触发保存点并取消作业(优雅地停止作业以便稍后从保存点运行)。
我在源代码中使用了一个两阶段提交函数,所以我的源代码实现了这两个功能 CheckpointedFunction 以及 CheckpointListener 接口。在 snapshotState() 方法调用我快照内部状态并打开 notifyCheckpointComplete() 我检查点状态到第三方系统。
从源代码中可以看到,只有 snapshotState() 部件同步于 CheckpointCoordinator -

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

检查点确认和完成通知在中是异步的 AsyncCheckpointRunnable .
也就是说,当 savepointcancel-job 设置为 true 被触发时,在拍摄快照之后,某些任务管理器会在取消和执行作业之前保持接收完成通知 notifyCheckpointComplete() ,有些则不然。
问题是是否有一种方法可以用savepoint取消作业,这样 notifyCheckpointComplete() 是否保证在取消作业之前由所有任务管理器调用,或者目前无法实现这一点?

n3ipq98p

n3ipq98p1#

我已经有一段时间没有看Flink1.9了,所以请谨慎地接受我的回答。
我猜你的消息来源取消得太早了。所以呢 notifyCheckpointComplete 实际上发送到所有任务,但是 SourceFunction 他已经辞职了 run 并对各自的任务进行了清理。
好吧,如果你在收到最后一封信之前忽略取消和中断,你所描述的应该是可能的 notifyCheckpointComplete .

class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // start two-phase commit
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // finish two-phase commit
        pendingCheckpoint = false;
    }

    @Override
    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // do normal source stuff
        }
        // keep the task running after cancellation
        while (pendingCheckpoint) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore interruptions until two-phase commit is done
            }
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}
oknrviil

oknrviil2#

使用stop with savepoint[1][2]难道不能解决问题吗?
[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-作业ID停止[2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html

相关问题