我有一个 processor
命名 AddCashProcessor
.
在 AddCashProcessor
,我保持 KeyValueStore<String, HashSet<String>>
记录付款人的用户标识 process
方法。代码如下:
@Override
public void process(String key, String value) {
HashSet<String> set = Optional.ofNullable(store.get(key)).orElse(new HashSet<>());
set.add(value);
store.put(key, set);
}
而且在 punctuate
名为的类的方法 AddCashPunctuator
哪个 implements the Punctuator interface
,我明白了 HashSet.size()
要将其插入mysql:
@Override
public void punctuate(long l) {
List<String> updateSqls = new ArrayList<>();
KeyValueIterator<String, HashSet<String>> iter = store.all();
while (iter.hasNext()) {
KeyValue<String, HashSet<String>> entry = iter.next();
int size = entry.getValue().size();
....
}
}
iter.close();
MySqlUtils.update(updateSqls);
}
这个 AddCashPunctuator
注册于 init
中的方法 AddCashProcessor
,如下所示:
@Override
public void init(ProcessorContext context) {
this.context = context;
....
this.context.schedule(30000L, PunctuationType.WALL_CLOCK_TIME, new AddCashPunctuator());
}
我要执行 punctuate
方法每30秒。但它不是这样做的,有时运行良好,有时停顿。暂停结束时,执行多次。
为什么是这个?是因为 HashSet
或者 KeyValueStore
太大了?我的Kafka流版本是1.0.0。我的Kafka版本是0.10.1.1。
谢谢!
1条答案
按热度按时间ycl3bljg1#
在kafka流中,只有一个线程负责常规处理和标点符号。因此,如果处理时间比标点计划长,那么对标点的调用可能会延迟。独立于此,gc暂停可以延迟标点符号。
因此,标点符号(对于任何带有gc暂停的系统)是最好的(即使有一个专用的标点符号线程)。
不幸的是,如果漏掉了一个标点,Kafka流会重放所有漏掉的标点。这就是为什么你一次得到多个。即将发布的1.1版本已经有了一个补丁(https://issues.apache.org/jira/browse/kafka-6323).