Flink状态函数-org中的不可靠行为,apache,flink,statefun,sdk,上下文::sendAfter

hrirmatl  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(100)

我们使用SF(状态函数)开发了一种类似超时的机制,其中我们有一个TimeoutManagerFunction,它保存和管理由其他函数设置的当前超时状态,并在超时时发回一条过期消息。
为了实现该行为,我们的TimeoutManagerFunction在X秒后向自身发送一条消息。(X表示超时到实际超时所需的秒数,在初始设置时从timeout setter函数传递)
超时设置器函数还可以通过向TimeoutManagerFunction发送更新消息来更新超时的到期时间,或者可以取消它。TimeoutManagerFunction根据收到的消息编辑到期时间或取消它。
我们目前有10个左右不同的函数与此TimeoutManagerFunction交互,其中一些函数几乎每秒都在设置/更新/取消超时。
因此,在TimeoutManagerFunction的set()、update()和cancel()方法上,我们基本上执行以下操作:
context.sendAfter(Duration.ofMillis(cancelableTimeout.getTimeoutDuration()), context.self(), selfTimeout);
TimeoutManagerFunction的timeout()方法上,我们检查给定的超时是否确实过期(因为有可能同时更新其过期日期):

protected void timeout(Context context, SelfTimeout selfTimeout) {
        try {
            // **timeouts** is the internal state of timeouts in TimeoutManagerFunction
            CancelableTimeout cancelableTimeout = timeouts.get(selfTimeout.getId());
            if (cancelableTimeout != null) {
                if (Instant.now().isAfter(Instant.ofEpochMilli(cancelableTimeout.getExpiresAt()))) {
                    /// send back timed out message back to initial timeout setter function...

我们遇到的问题基本上是,如果我们执行context.sendAfter(10秒,timeout对象)之类的操作,* 有时 * 需要超过10秒才能收到该消息。如果您对如何或为什么会发生这种情况有任何想法,我们将不胜感激。
我们的SF版本是2.2.1,Flink版本是1.11.6

ds97pgxw

ds97pgxw1#

无法保证会严格遵守请求的计时。如果群集相对于正在执行的处理量配置不足,则很可能会延迟触发某些计时器。

相关问题