背景:
将使用者拦截器设置为streamsconfig将确保在使用/提交消息时调用拦截器。来自的代码段 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync
```
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}
但是 `consumerInterceptor.onCommit()` 从未调用,即使我看到偏移量在源主题中提交。
问题:
我想这是因为我使用的kstreams只启用了一次处理保证。
这就是当时的逻辑 `org.apache.kafka.streams.processor.internals.StreamTask#commit` ```
if (this.eosEnabled) {
this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
this.producer.commitTransaction();
if (startNewTransaction) {
this.producer.beginTransaction();
}
} else {
this.consumer.commitSync(consumedOffsetsAndMetadata);
}
如你所见, consumer.commitSync
依次称为 consumerCoordinator.commit
这就叫 interceptor.onCommit
,不会被调用,因为在启用eos的情况下,调用的是事务api。
问:当在启用eos的源主题上提交偏移量时,有没有一种方法可以将回调挂接到kstream?
暂无答案!
目前还没有任何答案,快来回答吧!