一种自定义处理器,它以简单的方式缓冲事件 java.util.List
在 process()
-此缓冲区不是状态存储区。
每30秒一次, punctuate()
将此列表排序并刷新到Flume。假设只有一个分区源和汇。需要eos处理保证。
我也知道在任何时候 process()
被执行或 punctuate()
被执行。
我担心这个缓冲区没有得到changelog主题的支持。理想情况下,我认为这应该是一个支持eos的状态存储。
但有一种观点认为 commit.interval
超过30秒(例如40秒)将确保缓冲区中的事件不会丢失。而且因为我们使用 WALL_CLOCK_TIME
,的 punctuate()
无论我们是否有事件,总是每30秒调用一次。
这是一个有效的论点吗?这里有哪些情况会使缓冲区中的事件永远丢失?
@Override
public void init(ProcessorContext processorContext) {
super.init(processorContext);
this.buffer = new ArrayList<>();
context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);
}
void flush(long timestamp){
LOG.info("Punctuator invoked.....");
buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(
i -> context().forward(i.getId(), i)
);
}
@Override
public void process(String key, Customer value) {
LOG.info("Processing {}", key);
buffer.add(value);
}
6条答案
按热度按时间wj8zmpe11#
----30秒---
因此,调优commit和标点间隔将抑制对状态存储的需求这一论点是无效的。
plupiseo2#
----30秒
snvhrwxg3#
20s-------20s------|
c o m it:|--------30秒
wvt8vs2t4#
我有点反对调优commit和标点间隔并称这种设置为万无一失的。
从文件,墙上的时钟
这是最大的努力,因为它的粒度受到处理循环迭代完成所需时间的限制
有可能“错过”一个标点,如果:与标点类型#墙#时钟Š时间,在gc暂停,间隔太短
理想:
标点符号:|-----20s-------20s-------20s-------20s-------20s------|
c o m it:|--------30秒
xmq68pz95#
----30秒---
说
process()
花了太多时间(比如说18秒)所以punctuate()
在第40秒时没有为第二次运行调用-因为如doc所述,间隔太短。现在是第31秒,如果应用程序崩溃,即使启用了eos,缓冲区中的事件也会在源位置提交。重新启动时,缓冲区将丢失。
标点符号:|-----20s-------process()
zaqlnxep6#
----30秒