我们使用SF(状态函数)开发了一种类似超时的机制,其中我们有一个TimeoutManagerFunction,它保存和管理由其他函数设置的当前超时状态,并在超时时发回一条过期消息。
为了实现该行为,我们的TimeoutManagerFunction在X秒后向自身发送一条消息。(X表示超时到实际超时所需的秒数,在初始设置时从timeout setter函数传递)
超时设置器函数还可以通过向TimeoutManagerFunction发送更新消息来更新超时的到期时间,或者可以取消它。TimeoutManagerFunction根据收到的消息编辑到期时间或取消它。
我们目前有10个左右不同的函数与此TimeoutManagerFunction交互,其中一些函数几乎每秒都在设置/更新/取消超时。
因此,在TimeoutManagerFunction的set()、update()和cancel()方法上,我们基本上执行以下操作:context.sendAfter(Duration.ofMillis(cancelableTimeout.getTimeoutDuration()), context.self(), selfTimeout);
在TimeoutManagerFunction的timeout()方法上,我们检查给定的超时是否确实过期(因为有可能同时更新其过期日期):
protected void timeout(Context context, SelfTimeout selfTimeout) {
try {
// **timeouts** is the internal state of timeouts in TimeoutManagerFunction
CancelableTimeout cancelableTimeout = timeouts.get(selfTimeout.getId());
if (cancelableTimeout != null) {
if (Instant.now().isAfter(Instant.ofEpochMilli(cancelableTimeout.getExpiresAt()))) {
/// send back timed out message back to initial timeout setter function...
我们遇到的问题基本上是,如果我们执行context.sendAfter(10秒,timeout对象)之类的操作,* 有时 * 需要超过10秒才能收到该消息。如果您对如何或为什么会发生这种情况有任何想法,我们将不胜感激。
我们的SF版本是2.2.1,Flink版本是1.11.6
1条答案
按热度按时间ds97pgxw1#
无法保证会严格遵守请求的计时。如果群集相对于正在执行的处理量配置不足,则很可能会延迟触发某些计时器。