如果源/接收器/运算符具有未定义的uid或名称,则flink作业失败

zzwlnbp8  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(542)

在我的工作中,我希望每个源/汇/操作员都应该有 uid 以及 name 为便于识别而定义的属性。

operator.process(myFunction).uid(MY_FUNCTION).name(MY_FUNCTION);

现在我需要手动检查每项工作,以检测丢失的设置。我怎么能让Flink放弃工作呢 name 或者 uid 没有定义?

xzv2uavs

xzv2uavs1#

一旦你得到一个 StreamExecutionEnvironment 你可以得到算符的图形。
当你没有定义一个名字时,flink会自动为你生成一个。此外,如果您设置了一个名称,至少在源或汇的情况下,flink会添加一个前缀 Source: 或者 Sink: 为了这个名字。
如果不定义uid,则此阶段图形中的uid值为null。
在您的场景中,名称和uid始终相同,要检查是否为所有运算符提供了名称和uid,可以执行以下操作:

getExecutionEnvironment().getStreamGraph().getStreamNodes().stream()
            .filter(streamNode -> streamNode.getTransformationUID() == null ||
                    !streamNode.getOperatorName().contains(streamNode.getTransformationUID()))
            .forEach(System.out::println);

此代码段将打印所有与规则不匹配的运算符。
这在100%的情况下是行不通的,比如使用uid作为名称的子字符串。但这里有一个访问操作员信息的通用方法,可以应用适合您的情况的过滤器并执行您自己的策略。
此代码段可以反用作ci的一部分,也可以直接在应用程序中使用。

相关问题