在flink作业中,我想在构造状态24小时后删除它。我查看了这篇文章并设置了状态生存时间(ttl),但是正如本文中提到的,状态删除是惰性/被动的,这可能会导致内存泄漏。
例如,在23小时57分钟之后,我收到了key的最后一条消息('usa','male',2018),之后就再也没有这个key的消息了。然后我将无法调用该键的函数和状态的ttl('usa','male',2018),然后它将永远保存在内存中。
本文提到使用计时器: The idea is to register a timer with the TTL per state value and access. When the timer elapses, the state can be cleared if no other state access happened since the timer was registered.
但我不知道怎么做。
我在考虑使用 ProcessFunction
它有一个 onTimer()
方法。我的计划是注册一个 ProcessingTimeTimer
在它的 open()
方法,并删除中的状态 onTimer()
,但我不知道这个计时器是否也是被动触发的,这意味着如果没有调用 ProcessFunction
即使24小时后。
1条答案
按热度按时间afdcj2ne1#
使用
ProcessFunction
因为这是个好主意。这个ProcessFunction
将必须保持有问题的键控状态,并且将知道对该状态的所有读写操作,您可以使用这些操作以任何对应用程序有意义的方式创建和删除计时器。计时器是键控的(与状态键控的方式相同),处理时间计时器将按计划启动,而不管该键(或其他键)的流活动或不活动。如果作业在计划时间因某种原因关闭,则在作业恢复时,应该在中断期间触发的处理时间计时器将触发。