flink keyedstream生成具有相同密钥和窗口时间戳的重复结果

e1xvtsh3  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(369)

以下是我的flink工作流程:

DataStream<FlinkEvent> events = env.addSource( consumer ).flatMap(...).assignTimestampsAndWatermarks( new EventTsExtractor() );
DataStream<SessionStatEvent> sessionEvents = events.keyBy( 
    new KeySelector<FlinkEvent, Tuple2<String, String> >()
            {
                @Override
                public Tuple2<String, String> getKey( FlinkEvent value ) throws Exception {
                    return(Tuple2.of( value.getF0(), value.getSessionID ) );
                }
            } )
      .window( TumblingEventTimeWindows.of( Time.minutes( 2 ) ) )
      .allowedLateness( Time.seconds( 10 ) )
      .aggregate( new SessionStatAggregator(), new SessionStatProcessor() );
/* ... */
sessionEvents.addSink( esSinkBuilder.build() );

我第一次遇到
java.lang.exception:org.apache.flink.streaming.runtime.tasks.exceptioninchainedoperatorexception:无法将元素转发给下一个运算符
flatMap 操作员和任务继续重新启动。我通过相同的键和窗口时间戳观察到许多不同值的重复结果。
问题1:我猜重复是因为下游操作员在作业重新启动后重复使用消息。我说得对吗?我在修正了错误后重新提交了这份工作 ExceptionInChainedOperatorException 问题。我又在第一个时间窗口中观察到了复制品。在那之后,这项工作似乎很顺利(每个键在一个时间窗口中有一个结果)。
问题2:复制品是从哪里来的?

2w3kk1z5

2w3kk1z51#

这就是Flink实现一次语义的方式。在失败的情况下,flink会重放上次成功检查点的事件。需要注意的是,只影响一次状态,而不是只处理/发布一次事件。
回答问题1:是的,每次重新启动都会一次又一次地处理相同的消息回答问题2:错误修复后的第一个窗口会再次处理这些消息;然后一切恢复正常。

gv8xihay

gv8xihay2#

... 对于一个窗口,每个键应该有一个结果
这(完全)是不对的。由于允许延迟,任何延迟事件(在允许的延迟时间内)都将导致相关窗口延迟(或者换句话说,额外)触发。使用默认的eventtimetrigger(您似乎正在使用它),每个延迟事件都会触发一个额外的窗口,并且会发出一个更新的窗口结果。

相关问题