使用Kafka信号源时终止Flink作业

5f0d552i  于 2022-10-07  发布在  Kafka
关注(0)|答案(1)|浏览(202)

当我的Producer完成将其所有消息流到Kafka,并且在Flink完成处理它们之后,我希望能够终止Flink作业,以便它不会继续运行,并且我也可以知道Flink何时完成了对所有数据的处理。我也不能使用批处理,因为我需要Flink与我的Kafka流并行运行。

通常,Flink在DeserializationSchema类中使用isEndOfStream方法来查看是否应该提前结束(在该方法中返回TRUE将自动结束作业)。然而,当使用Kafka作为Flink的源时,新的KafkaSource类已经不推荐在反序列化程序中使用isEndOfStream方法,并且不再检查它以确定流是否应该结束。有没有其他方法可以提前终止Flink作业?

8ftvxx2r

8ftvxx2r1#

KafkaSource提供的对有界流进行操作的机制是将setBoundedsetUnbounded与构建器一起使用,如

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(...)
        .setGroupId(...)
        .setTopics(...)
        .setDeserializer(...) // or setValueOnlyDeserializer
        .setStartingOffsets(...)
        .setBounded(...) // or setUnbounded
        .build();

setBounded表示,一旦源使用完指定偏移量的所有数据,就应该停止源。

可以使用setUnbounded来指示,虽然源不应该读取超过指定偏移量的任何数据,但它应该保持运行。这允许源在流模式下运行时参与检查点设置。

如果你事先知道你有多想看书,这个方法就行得通。我使用了带有特定时间戳的setBounded,例如,

.setBounded(
    OffsetsInitializer.timestamp(
      Instant.parse("2021-10-31T23:59:59.999Z").toEpochMilli()))

也是这样的

.setBounded(OffsetsInitializer.latest())

相关问题