我有一个关于flink kafka消费者(flinkkafkaconsumer09)的问题。我一直在使用这个版本的连接器:flink-connector-kafka-0.9_2.11-1.1.2(连接器版本是0.9,akka版本是2.11,flink版本是1.1.2)
我在5分钟内从Kafka收集通讯数据。据我所见,窗口与系统时间一致(例如,窗口在12:45、12:50、12:55、13:00等结束),在窗口关闭后,其记录将被处理/聚合,并通过sink操作符发送到数据库。
我的程序的简化版本:
env.addSource(new FlinkKafkaConsumer09<>(topicName,jsonMapper, properties))
.keyBy("srcIp", "dstIp", "dstPort")
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
.apply(new CounterSum<>())
.addSink(new DbSink(...));
不过,我需要提交Kafka抵消。据我所知,flinkkafkaconsumer09中唯一的方法就是打开检查点。我是这样做的:
env.enableCheckpointing(300000); // 5 minutes
检查点存储所有操作符的状态。检查点完成后,偏移量将提交给Kafka。我的检查点是通过fsstatebendback存储在taskmanager系统文件结构中的(第一个问题-旧的检查点数据没有被删除,我看到一些错误被报告了)。第二个问题是何时触发检查点。如果在窗口的开头触发,则生成的检查点文件很小,而在窗口关闭之前触发,则生成的状态很大(例如50mb),因为此窗口中已经有许多通信记录。检查点进程通常需要不到1-2秒的时间,但是当窗口关闭后触发检查点,并且在处理聚合和db接收器时,检查点进程需要45秒。
但关键是我根本不需要状态检查点。我只需要在窗口关闭、处理并将结果数据下沉到db(或在另一个窗口的开头)之后将offset提交给kafka。若发生故障转移,flink将从kafka获取最后一个偏移量,并再次读取最后5分钟间隔的数据。因为最后一个失败的结果并没有被发送到db,所以不会有重复的数据被发送到db,并且重新读取最后5分钟的间隔是没有开销的。
所以基本上我有两个问题:
有没有什么方法可以实现关闭检查点并且只提交上面描述的偏移量?
如果没有,有没有办法让检查点和窗口的开头对齐?我阅读了flink文档—有一个叫做保存点(即手动检查点)的特性,但它是从命令行使用的。我需要从窗口开始时的代码中调用savepoint—状态将很小,检查点过程将很快。
暂无答案!
目前还没有任何答案,快来回答吧!