以下是我的flink工作流程:
DataStream<FlinkEvent> events = env.addSource( consumer ).flatMap(...).assignTimestampsAndWatermarks( new EventTsExtractor() );
DataStream<SessionStatEvent> sessionEvents = events.keyBy(
new KeySelector<FlinkEvent, Tuple2<String, String> >()
{
@Override
public Tuple2<String, String> getKey( FlinkEvent value ) throws Exception {
return(Tuple2.of( value.getF0(), value.getSessionID ) );
}
} )
.window( TumblingEventTimeWindows.of( Time.minutes( 2 ) ) )
.allowedLateness( Time.seconds( 10 ) )
.aggregate( new SessionStatAggregator(), new SessionStatProcessor() );
/* ... */
sessionEvents.addSink( esSinkBuilder.build() );
我第一次遇到
java.lang.exception:org.apache.flink.streaming.runtime.tasks.exceptioninchainedoperatorexception:无法将元素转发给下一个运算符
在 flatMap
操作员和任务继续重新启动。我通过相同的键和窗口时间戳观察到许多不同值的重复结果。
问题1:我猜重复是因为下游操作员在作业重新启动后重复使用消息。我说得对吗?我在修正了错误后重新提交了这份工作 ExceptionInChainedOperatorException
问题。我又在第一个时间窗口中观察到了复制品。在那之后,这项工作似乎很顺利(每个键在一个时间窗口中有一个结果)。
问题2:复制品是从哪里来的?
2条答案
按热度按时间2w3kk1z51#
这就是Flink实现一次语义的方式。在失败的情况下,flink会重放上次成功检查点的事件。需要注意的是,只影响一次状态,而不是只处理/发布一次事件。
回答问题1:是的,每次重新启动都会一次又一次地处理相同的消息回答问题2:错误修复后的第一个窗口会再次处理这些消息;然后一切恢复正常。
gv8xihay2#
... 对于一个窗口,每个键应该有一个结果
这(完全)是不对的。由于允许延迟,任何延迟事件(在允许的延迟时间内)都将导致相关窗口延迟(或者换句话说,额外)触发。使用默认的eventtimetrigger(您似乎正在使用它),每个延迟事件都会触发一个额外的窗口,并且会发出一个更新的窗口结果。