我们有一个Flink作业,其拓扑如下:
source -> filter -> map -> sink
我们在接收器操作符open-覆盖功能处设置实时(就绪)状态。在我们获得该状态之后,我们发送事件。有时它不能使用提前发送的事件。
open
我们想知道我们可以发送不会丢失的数据的确切时间/步骤。
vulvrdjw1#
看起来您希望确保没有遗漏要处理的消息。Kafka会保留您的消息,因此没有必要只在Flink消费者准备好时才发送消息。您可以通过避免状态消息来简化设计。
任何Kafka消费者(不仅仅是Flink Connector)都将在Kafka服务器中有一个与其关联的偏移量,以跟踪最后消费的消息的ID。
来自Kafka文档:Kafka为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,并且还表示消费者在该分区中的位置。例如,位置5的消费者已经使用了偏移量为0到4的记录,并且接下来将接收偏移量为5的记录
在Flink Kafka连接器中,将偏移量指定为承诺的偏移量。
OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
这将确保如果重新启动您的Flink连接器,它将从重新启动前的最后一个位置消耗掉。
如果由于某种原因,偏移量丢失,这将从您的Kafka主题的开头(最早的消息)开始阅读。请注意,此方法将导致您重新处理消息。
你可以探索更多的抵消策略来选择适合你的策略。
Reference-https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
1条答案
按热度按时间vulvrdjw1#
看起来您希望确保没有遗漏要处理的消息。Kafka会保留您的消息,因此没有必要只在Flink消费者准备好时才发送消息。您可以通过避免状态消息来简化设计。
任何Kafka消费者(不仅仅是Flink Connector)都将在Kafka服务器中有一个与其关联的偏移量,以跟踪最后消费的消息的ID。
来自Kafka文档:
Kafka为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,并且还表示消费者在该分区中的位置。例如,位置5的消费者已经使用了偏移量为0到4的记录,并且接下来将接收偏移量为5的记录
在Flink Kafka连接器中,将偏移量指定为承诺的偏移量。
这将确保如果重新启动您的Flink连接器,它将从重新启动前的最后一个位置消耗掉。
如果由于某种原因,偏移量丢失,这将从您的Kafka主题的开头(最早的消息)开始阅读。请注意,此方法将导致您重新处理消息。
你可以探索更多的抵消策略来选择适合你的策略。
Reference-https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset