时间窗口关闭延迟?

e0bqpujr  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(243)

我是新来Kafka的。
我使用ktable的suppress方法只处理如下窗口的最终结果:

myStream
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofMillis(500)))
    .aggregate(new Aggregation(),
        (k, v, a) -> a,  // Disabled the actual aggregation in order to eliminate possiblities of latency
        materialized.withLoggingDisabled())
    .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream().peek((k, v) -> log.info("delay " + (System.currentTimeMillis() - k.window().endTime().toEpochMilli())));

这样我就得到了一个日志,每10秒延迟一次,窗口结束时间和调用peek的实际时间之间的差值。我会在这里执行一个非常小的数字,因为这个代码实际上什么都不做。。。
尽管如此,每个键/窗口的延迟时间为4-20秒。
我每个任务使用一个线程(这个主题有5个线程)。
有人能指出我做错了什么吗?
谢谢!
编辑:
使用virtualvm显示,通过sun.nio.ch.selectorimpl.select()消耗的时间约为99%。这意味着afaiu,进程在大多数时间是“空闲”的。
编辑:
似乎更改“commit.interval.ms”(默认情况下为30000)大大减少了延迟。
静止延迟有15秒的峰值,所以问题还没有解决。。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题