Flink流没有结束

1zmg4dgp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(298)

我正在使用kafka和elasticsearch设置flink流处理器。我想重放我的数据,但是当我将并行度设置为大于1时,它并没有完成程序,我认为这是因为kafka流只看到一条消息被标识为流的结尾。

public CustomSchema(Date _endTime) {
        endTime = _endTime;
    }

@Override
    public boolean isEndOfStream(CustomTopicWrapper nextElement) {
        if (this.endTime != null && nextElement.messageTime.getTime() >= this.endTime.getTime()) {
            return true;
        }
        return false;
    }

有没有办法告诉flink用户组中的所有线程在一个线程完成后结束?

6yjfywim

6yjfywim1#

如果实现了自己的sourcefunction,请使用 cancel 方法,如下面的示例所示。flinkkafkaconsumerbase类还有cancel方法。

相关问题