kafka streams低级处理器api的标点符号不定期运行

mgdq6dx1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(265)

我有一个 processor 命名 AddCashProcessor .
AddCashProcessor ,我保持 KeyValueStore<String, HashSet<String>> 记录付款人的用户标识 process 方法。代码如下:

@Override
public void process(String key, String value) {
    HashSet<String> set = Optional.ofNullable(store.get(key)).orElse(new HashSet<>());
    set.add(value);
    store.put(key, set);
}

而且在 punctuate 名为的类的方法 AddCashPunctuator 哪个 implements the Punctuator interface ,我明白了 HashSet.size() 要将其插入mysql:

@Override
public void punctuate(long l) {
    List<String> updateSqls = new ArrayList<>();

    KeyValueIterator<String, HashSet<String>> iter = store.all();
    while (iter.hasNext()) {
        KeyValue<String, HashSet<String>> entry = iter.next();
            int size = entry.getValue().size();
            ....
        }
    }
    iter.close();

    MySqlUtils.update(updateSqls);

}

这个 AddCashPunctuator 注册于 init 中的方法 AddCashProcessor ,如下所示:

@Override
public void init(ProcessorContext context) {
    this.context = context;
    ....
    this.context.schedule(30000L, PunctuationType.WALL_CLOCK_TIME, new AddCashPunctuator());
}

我要执行 punctuate 方法每30秒。但它不是这样做的,有时运行良好,有时停顿。暂停结束时,执行多次。
为什么是这个?是因为 HashSet 或者 KeyValueStore 太大了?我的Kafka流版本是1.0.0。我的Kafka版本是0.10.1.1。
谢谢!

ycl3bljg

ycl3bljg1#

在kafka流中,只有一个线程负责常规处理和标点符号。因此,如果处理时间比标点计划长,那么对标点的调用可能会延迟。独立于此,gc暂停可以延迟标点符号。
因此,标点符号(对于任何带有gc暂停的系统)是最好的(即使有一个专用的标点符号线程)。
不幸的是,如果漏掉了一个标点,Kafka流会重放所有漏掉的标点。这就是为什么你一次得到多个。即将发布的1.1版本已经有了一个补丁(https://issues.apache.org/jira/browse/kafka-6323).

相关问题