提前触发窗口实现问题-收到重复元素

aoyhnmkz  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(280)

我很难理解Flink窗口的原则,如果你能给我指出正确的方向,我将非常高兴。
我的目的是统计一个时间间隔内的重复事件数,如果重复事件数大于阈值,则生成警报事件。
据我所知,窗口是这个场景的完美匹配。
如果窗口中的重复事件计数为2,则需要生成早期警报(即,应在不等待窗口结束的情况下生成警报)。
我认为alert event generating process window函数可用于聚合窗口化事件,而自定义触发器可用于根据循环事件计数(在水印到达窗口的结束时间戳之前)从窗口发出早期结果。
我正在使用事件时间语义,对自定义触发器有问题。
您可以在要点中找到实际实现:https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36
我使用键控状态来跟踪窗口中的元素计数 encounteredElementsCountState 收到第一个元素后,我注册 EventTimeTimer 到Windows的尽头。这应该会触发 FIRE_AND_PURGE 用于按预期关闭窗口和工作。
如果计数超过临界值,我会尝试提前开火。这似乎也很成功, processwindow 函数在触发后立即被调用。
问题是,我必须插入下面的检查代码没有理解的原因。因为之前收集的元素再次提供给 onElement 方法:

if (ctx.getCurrentWatermark() < 0) {
            logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
            return TriggerResult.CONTINUE;
        }

我想不出原因。我看到的是,当这种情况发生时,水印值是 (ctx.getCurrentWatermark()) Long.MIN_VALUE (导致上述检查)。怎么会这样?
此检查似乎避免了重复的早期事件生成,但我不知道为什么会发生这种情况,以及此解决方法是否合适。
你能告诉我为什么同样的元素在窗口中被处理两次吗?
另一个问题是关于键控状态的用法。在窗口被释放后,这个实现是否泄漏任何状态?我正在尝试用清除触发器的方法清除所有使用过的状态,但这样就足够了吗?
当做。

oalqel3c

oalqel3c1#

每个任务都将currentwatermark初始化为long.min\u值,这将保留currentwatermark的本地值,直到从该任务的所有输入流中到达较大的水印为止。希望知道这一点能帮助你更好地理解发生了什么。
值得一提的是,用processfunction实现这种逻辑通常比用windowapi更简单。

相关问题