在我的工作中,我希望每个源/汇/操作员都应该有 uid 以及 name 为便于识别而定义的属性。
uid
name
operator.process(myFunction).uid(MY_FUNCTION).name(MY_FUNCTION);
现在我需要手动检查每项工作,以检测丢失的设置。我怎么能让Flink放弃工作呢 name 或者 uid 没有定义?
xzv2uavs1#
一旦你得到一个 StreamExecutionEnvironment 你可以得到算符的图形。当你没有定义一个名字时,flink会自动为你生成一个。此外,如果您设置了一个名称,至少在源或汇的情况下,flink会添加一个前缀 Source: 或者 Sink: 为了这个名字。如果不定义uid,则此阶段图形中的uid值为null。在您的场景中,名称和uid始终相同,要检查是否为所有运算符提供了名称和uid,可以执行以下操作:
StreamExecutionEnvironment
Source:
Sink:
getExecutionEnvironment().getStreamGraph().getStreamNodes().stream() .filter(streamNode -> streamNode.getTransformationUID() == null || !streamNode.getOperatorName().contains(streamNode.getTransformationUID())) .forEach(System.out::println);
此代码段将打印所有与规则不匹配的运算符。这在100%的情况下是行不通的,比如使用uid作为名称的子字符串。但这里有一个访问操作员信息的通用方法,可以应用适合您的情况的过滤器并执行您自己的策略。此代码段可以反用作ci的一部分,也可以直接在应用程序中使用。
1条答案
按热度按时间xzv2uavs1#
一旦你得到一个
StreamExecutionEnvironment
你可以得到算符的图形。当你没有定义一个名字时,flink会自动为你生成一个。此外,如果您设置了一个名称,至少在源或汇的情况下,flink会添加一个前缀
Source:
或者Sink:
为了这个名字。如果不定义uid,则此阶段图形中的uid值为null。
在您的场景中,名称和uid始终相同,要检查是否为所有运算符提供了名称和uid,可以执行以下操作:
此代码段将打印所有与规则不匹配的运算符。
这在100%的情况下是行不通的,比如使用uid作为名称的子字符串。但这里有一个访问操作员信息的通用方法,可以应用适合您的情况的过滤器并执行您自己的策略。
此代码段可以反用作ci的一部分,也可以直接在应用程序中使用。