要求-仅在手动重新启动或意外失败时使用主题中的最新消息
当flink作业失败并重新启动时,该作业将以还原的检查点开始,这将尝试处理存储在状态中的kafka记录。为了避免旧记录,我尝试更改组id。检查点的记录仍在处理中。
我使用以下代码只处理最新的记录。它起作用了。但唯一的问题是,我不能忽略状态从检查点为Flink卡法克消费者在意外失败的情况下。
代码:myconsumer.setstartfromlatest();
文档:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-消费者开始位置配置
我唯一的要求就是处理Kafka的最新事件。
谢谢您
暂无答案!
目前还没有任何答案,快来回答吧!