Kafka Stream每次迭代执行多个标点符号

7xllpg7q  于 2023-04-29  发布在  Apache
关注(0)|答案(2)|浏览(125)

我有一个有时间表的变压器

context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(stateStore));

然后我的标点符号课

public class MyPunctuator implements Punctuator {

    @Override
    public void punctuate(final long timestamp) {
    }
}

现在奇怪的事情是,当调度工作时,每个迭代调用标点符号4次

[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164829
[StreamThread-1] INFO MyPunctuator  - store=0
[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164830
[StreamThread-1] INFO MyPunctuator  - store=1
[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164831
[StreamThread-1] INFO MyPunctuator  - store=0
[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164832
[StreamThread-1] INFO MyPunctuator  - store=0

知道为什么吗

nxowjjhe

nxowjjhe1#

基于标点符号类型,这就是Schedule - Punctuate的工作方式。您可以将此示例与您的用例进行比较。

PunctuationType。流时间

如果基于PunctuationType每10秒安排一次Punctuator函数。STREAM_TIME,如果你处理一个60条记录的流,时间戳从1(第一条记录)到60秒(最后一条记录),那么punctuate()将被调用6次。无论实际处理这些记录所需的时间如何,都会发生这种情况。punctuate()将被调用6次,而不管处理这60条记录是花费一秒、一分钟还是一小时。

PunctuationType。挂钟时间

当wall-clock-time(挂钟时间)e. PunctuationType。WALL_CLOCK_TIME),punctuate()则完全由挂钟时间触发。如果基于PunctuationType调度Punctuator函数,则重用上述示例。WALL_CLOCK_TIME,如果这60条记录在20秒内被处理,则punctuate()被调用2次(每10秒一次)。如果这60条记录在5秒内被处理,那么根本不会调用punctuate()。请注意,通过在init()方法中多次调用ProcessorContext#schedule(),可以在同一处理器中调度具有不同PunctuationType类型的多个Punctuator回调。

v7pvogib

v7pvogib2#

您将为每个流线程获得一个标点符号。

相关问题