我正在使用kafka和elasticsearch设置flink流处理器。我想重放我的数据,但是当我将并行度设置为大于1时,它并没有完成程序,我认为这是因为kafka流只看到一条消息被标识为流的结尾。
public CustomSchema(Date _endTime) {
endTime = _endTime;
}
@Override
public boolean isEndOfStream(CustomTopicWrapper nextElement) {
if (this.endTime != null && nextElement.messageTime.getTime() >= this.endTime.getTime()) {
return true;
}
return false;
}
有没有办法告诉flink用户组中的所有线程在一个线程完成后结束?
1条答案
按热度按时间6yjfywim1#
如果实现了自己的sourcefunction,请使用
cancel
方法,如下面的示例所示。flinkkafkaconsumerbase类还有cancel方法。