我可以依靠kafka流中的内存java集合通过微调标点和提交间隔来缓冲事件吗?

ej83mcc0  于 2021-06-04  发布在  Kafka
关注(0)|答案(6)|浏览(263)

一种自定义处理器,它以简单的方式缓冲事件 java.util.Listprocess() -此缓冲区不是状态存储区。
每30秒一次, punctuate() 将此列表排序并刷新到Flume。假设只有一个分区源和汇。需要eos处理保证。
我也知道在任何时候 process() 被执行或 punctuate() 被执行。
我担心这个缓冲区没有得到changelog主题的支持。理想情况下,我认为这应该是一个支持eos的状态存储。
但有一种观点认为 commit.interval 超过30秒(例如40秒)将确保缓冲区中的事件不会丢失。而且因为我们使用 WALL_CLOCK_TIME ,的 punctuate() 无论我们是否有事件,总是每30秒调用一次。
这是一个有效的论点吗?这里有哪些情况会使缓冲区中的事件永远丢失?

@Override
public void init(ProcessorContext processorContext) {
    super.init(processorContext);
    this.buffer = new ArrayList<>();
    context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);
}

void flush(long timestamp){
    LOG.info("Punctuator invoked.....");
    buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(
            i -> context().forward(i.getId(), i)
    );
}

@Override
public void process(String key, Customer value) {
    LOG.info("Processing {}", key);
    buffer.add(value);
}
wj8zmpe1

wj8zmpe11#

----30秒---
因此,调优commit和标点间隔将抑制对状态存储的需求这一论点是无效的。

snvhrwxg

snvhrwxg3#

20s-------20s------|
c o m it:|--------30秒

wvt8vs2t

wvt8vs2t4#

我有点反对调优commit和标点间隔并称这种设置为万无一失的。
从文件,墙上的时钟
这是最大的努力,因为它的粒度受到处理循环迭代完成所需时间的限制
有可能“错过”一个标点,如果:与标点类型#墙#时钟Š时间,在gc暂停,间隔太短

理想:

标点符号:|-----20s-------20s-------20s-------20s-------20s------|
c o m it:|--------30秒

xmq68pz9

xmq68pz95#

----30秒---
process() 花了太多时间(比如说18秒)所以 punctuate() 在第40秒时没有为第二次运行调用-因为如doc所述,间隔太短。
现在是第31秒,如果应用程序崩溃,即使启用了eos,缓冲区中的事件也会在源位置提交。重新启动时,缓冲区将丢失。
标点符号:|-----20s-------process()

相关问题