我在kafka流的条件中使用context.commit。但是,即使条件不满足并且没有执行commit,kafka流也会移动到下一个记录,而当前记录永远不会被执行。你能告诉我怎么做这个手工提交吗。
if ( boolean == true)
{
this.context.forward(key, eventMessage, To.child(TopologyConstants.SINK));
this.context.commit();
}
// even though the condition is not meet and the context is not committed it moves to the
// next record
暂无答案!
目前还没有任何答案,快来回答吧!