如何在Flink设立Kafka委员会?

gstyhher  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(246)

我已经设置了承诺抵消:

properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("auto.commit.enable", "true");
properties.setProperty("enable.auto.commit", "true");

FlinkKafkaConsumer08<MobilePageEvent> kafkaConsumer =
            new FlinkKafkaConsumer08<>(
                    "mobile-event.page-resource", SCHEMA, properties);

但在web ui中,我得到了无效的提交偏移量:

3bygqnnd

3bygqnnd1#

启用检查点后,它现在可以工作:

StreamExecutionEnvironment environment =
            StreamExecutionEnvironment.getExecutionEnvironment();

environment.enableCheckpointing(5000);

相关问题