我有一个有时间表的变压器
context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(stateStore));
然后我的标点符号课
public class MyPunctuator implements Punctuator {
@Override
public void punctuate(final long timestamp) {
}
}
现在奇怪的事情是,当调度工作时,每个迭代调用标点符号4次
[StreamThread-1] INFO MyPunctuator - [Punctuator Scan] - Timestamp 1660083164829
[StreamThread-1] INFO MyPunctuator - store=0
[StreamThread-1] INFO MyPunctuator - [Punctuator Scan] - Timestamp 1660083164830
[StreamThread-1] INFO MyPunctuator - store=1
[StreamThread-1] INFO MyPunctuator - [Punctuator Scan] - Timestamp 1660083164831
[StreamThread-1] INFO MyPunctuator - store=0
[StreamThread-1] INFO MyPunctuator - [Punctuator Scan] - Timestamp 1660083164832
[StreamThread-1] INFO MyPunctuator - store=0
知道为什么吗
2条答案
按热度按时间nxowjjhe1#
基于标点符号类型,这就是Schedule - Punctuate的工作方式。您可以将此示例与您的用例进行比较。
PunctuationType。流时间
如果基于PunctuationType每10秒安排一次Punctuator函数。STREAM_TIME,如果你处理一个60条记录的流,时间戳从1(第一条记录)到60秒(最后一条记录),那么punctuate()将被调用6次。无论实际处理这些记录所需的时间如何,都会发生这种情况。punctuate()将被调用6次,而不管处理这60条记录是花费一秒、一分钟还是一小时。
PunctuationType。挂钟时间
当wall-clock-time(挂钟时间)e. PunctuationType。WALL_CLOCK_TIME),punctuate()则完全由挂钟时间触发。如果基于PunctuationType调度Punctuator函数,则重用上述示例。WALL_CLOCK_TIME,如果这60条记录在20秒内被处理,则punctuate()被调用2次(每10秒一次)。如果这60条记录在5秒内被处理,那么根本不会调用punctuate()。请注意,通过在init()方法中多次调用ProcessorContext#schedule(),可以在同一处理器中调度具有不同PunctuationType类型的多个Punctuator回调。
v7pvogib2#
您将为每个流线程获得一个标点符号。