我想使用保存点机制将现有作业从Flink的一个版本移动到另一个版本,方法是:
1.停止具有保存点的作业
1.在新版本上从保存点创建新作业。
直到Flink 1.14我都没有问题,但在Flink 1.15.1中,它失败了。即使不改变版本,停留在1.15.1中,它也失败了。我得到了这个错误,这意味着它不能将状态从以前的作业Map到新的作业,因为一个操作符:
Failed to rollback to checkpoint/savepoint hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c to the new program, because the operator is not available in the new program.
经过调查,有问题的运算符对应于一个ChangelogNormalize
运算符,我没有显式地创建它。它是因为我使用tableEnv.fromChangelogStream(stream, schema, ChangelogMode.upsert())
而生成的(upsert模式很重要,其他模式不会失败)。创建的表被传递给使用SQL API的SQL查询,它生成如下所示的内容:
ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> [my_sql_transformation] -> [my_sink]
在以前的Flink版本中,这个操作符总是被赋予相同的uid,这样当从保存点开始时,状态可以匹配。在Flink 1.15.1中,每次都生成一个不同的uid。我找不到一个可靠的方法来手动设置这个uid。我找到的唯一方法是从转换开始向后:
dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");
但我希望有更好的方法来做到这一点。
你知道我做错了什么吗?会不会是Flink的一个漏洞?
1条答案
按热度按时间58wvjzkj1#
在JIRA上打开一个问题后,它似乎确实是一个bug。
目前的一个解决方法是将
table.exec.legacy-transformation-uids
设置为true
。