当我的Producer完成将其所有消息流到Kafka,并且在Flink完成处理它们之后,我希望能够终止Flink作业,以便它不会继续运行,并且我也可以知道Flink何时完成了对所有数据的处理。我也不能使用批处理,因为我需要Flink与我的Kafka流并行运行。
通常,Flink在DeserializationSchema
类中使用isEndOfStream
方法来查看是否应该提前结束(在该方法中返回TRUE将自动结束作业)。然而,当使用Kafka作为Flink的源时,新的KafkaSource
类已经不推荐在反序列化程序中使用isEndOfStream
方法,并且不再检查它以确定流是否应该结束。有没有其他方法可以提前终止Flink作业?
1条答案
按热度按时间8ftvxx2r1#
KafkaSource
提供的对有界流进行操作的机制是将setBounded
或setUnbounded
与构建器一起使用,如setBounded
表示,一旦源使用完指定偏移量的所有数据,就应该停止源。可以使用
setUnbounded
来指示,虽然源不应该读取超过指定偏移量的任何数据,但它应该保持运行。这允许源在流模式下运行时参与检查点设置。如果你事先知道你有多想看书,这个方法就行得通。我使用了带有特定时间戳的
setBounded
,例如,也是这样的