Flink "使用fromChangelogStream时,无法Map操作员"的检查点/保存点状态

7vux5j2d  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(300)

我想使用保存点机制将现有作业从Flink的一个版本移动到另一个版本,方法是:
1.停止具有保存点的作业
1.在新版本上从保存点创建新作业。
直到Flink 1.14我都没有问题,但在Flink 1.15.1中,它失败了。即使不改变版本,停留在1.15.1中,它也失败了。我得到了这个错误,这意味着它不能将状态从以前的作业Map到新的作业,因为一个操作符:

Failed to rollback to checkpoint/savepoint hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c to the new program, because the operator is not available in the new program.

经过调查,有问题的运算符对应于一个ChangelogNormalize运算符,我没有显式地创建它。它是因为我使用tableEnv.fromChangelogStream(stream, schema, ChangelogMode.upsert())而生成的(upsert模式很重要,其他模式不会失败)。创建的表被传递给使用SQL API的SQL查询,它生成如下所示的内容:

ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> [my_sql_transformation] -> [my_sink]

在以前的Flink版本中,这个操作符总是被赋予相同的uid,这样当从保存点开始时,状态可以匹配。在Flink 1.15.1中,每次都生成一个不同的uid。我找不到一个可靠的方法来手动设置这个uid。我找到的唯一方法是从转换开始向后:

dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");

但我希望有更好的方法来做到这一点。
你知道我做错了什么吗?会不会是Flink的一个漏洞?

58wvjzkj

58wvjzkj1#

在JIRA上打开一个问题后,它似乎确实是一个bug。
目前的一个解决方法是将table.exec.legacy-transformation-uids设置为true

相关问题