问:如何查询Kafka主题中flink代码中特定消费群体的偏移量(和旁白(如果需要的话,我会在这里提出一个新问题)。如果可能的话,我怎样才能得到偏移量的时间戳?
(我发现有cli工具可以直接查询它,但这不是我想要的,因为它不是在flink作业中以编程方式完成的)
关于整个问题的一些额外的背景知识,但我不想让这个问题过于开放。
我有一个用例,其中数据将从kafkatopic1流入程序(我们称之为p1),经过处理,然后持久化到数据库。p1将位于多节点集群上,因此每个节点将处理多个kafka分区(假设主题有5个节点和50个kafka分区)。如果其中一个节点由于任何原因完全失败,并且有数据正在被处理,那么该数据将丢失。
例如,如果kafkatopic1上有500条消息,而node2已经拉取了10条消息(因此根据偏移量拉取的下一条消息是message 11),但是只有8条消息被完全处理并在节点失败时持久化到数据库中,那么仍在处理的2条消息将丢失。当节点恢复时,它将开始读取消息11,跳过两条丢失的消息(从技术上讲,kafka分区将开始将其消息发送到另一个要处理的节点,因此该分区的偏移量将移动,并且我们不一定知道节点死亡时下一个要处理的消息是什么)。
(注意:当节点死亡时,假设用户注意到并完全关闭p1,因此此时暂时不再处理更多数据)。
所以这就是Flink发挥作用的地方。我想做一个flink作业,可以通过用户的参数告诉p1的消费者组,然后查询kafka主题(也由用户提供)以获得当前偏移量(os1)。然后,flink作业将kafkatopic1的偏移量设置为os1之前的x时间量(x由用户通过args提供),并开始从kafka主题读取消息。然后,它会将读取的每条消息与数据库中的消息进行比较,如果在数据库中找不到该消息,它会将其发送到另一个Kafka主题(kafkatopic2),在重新启动时由p1处理。
1条答案
按热度按时间k3fezbri1#
如果在flink作业中启用了检查点,那么就不应该丢失消息,因为flink也在内部维护偏移量,并且在从失败中恢复之后,它应该从flink上次提交的偏移量中读取。
现在,如果您仍然想找到偏移量并重新开始从偏移量中读取数据,这会变得很棘手,因为您需要按给定的使用者组找到给定主题的所有分区的偏移量。
我不知道如何从现成的flink-kafka消费者api实现这一点,但是您可以将kafka依赖项添加到您的项目中,并从kafka api创建一个kafkaconsumer。一旦你有了消费者,你可以打电话
或
请注意,您仍然需要循环所有分区以获得所有当前偏移量
阅读这里的差异:Kafkajavadoc
一旦有了要从中读取的偏移量,就可以使用以下方法手动指定flink作业中的使用者偏移量:
有关flink kafka消费者的更多信息,请查看此flink kafka连接器