Flink工作什么时候可以消耗Kafka?

ehxuflar  于 2022-09-21  发布在  Flink
关注(0)|答案(1)|浏览(216)

我们有一个Flink作业,其拓扑如下:

source -> filter -> map -> sink

我们在接收器操作符open-覆盖功能处设置实时(就绪)状态。在我们获得该状态之后,我们发送事件。有时它不能使用提前发送的事件。

我们想知道我们可以发送不会丢失的数据的确切时间/步骤。

vulvrdjw

vulvrdjw1#

看起来您希望确保没有遗漏要处理的消息。Kafka会保留您的消息,因此没有必要只在Flink消费者准备好时才发送消息。您可以通过避免状态消息来简化设计。

任何Kafka消费者(不仅仅是Flink Connector)都将在Kafka服务器中有一个与其关联的偏移量,以跟踪最后消费的消息的ID。

来自Kafka文档:
Kafka为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,并且还表示消费者在该分区中的位置。例如,位置5的消费者已经使用了偏移量为0到4的记录,并且接下来将接收偏移量为5的记录

在Flink Kafka连接器中,将偏移量指定为承诺的偏移量。

OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)

这将确保如果重新启动您的Flink连接器,它将从重新启动前的最后一个位置消耗掉。

如果由于某种原因,偏移量丢失,这将从您的Kafka主题的开头(最早的消息)开始阅读。请注意,此方法将导致您重新处理消息。

你可以探索更多的抵消策略来选择适合你的策略。

Reference-https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset

相关问题