我正在使用flink v1.4.0。我正在使用来自 Kafka
主题使用 Kafka FLink Consumer
按照以下代码:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);
...
有没有办法知道我是否把整个主题都看完了?如何监视偏移(这是一个充分的方式来确认我已经消耗了所有的数据从内部 Kafka
主题?)
2条答案
按热度按时间hpxqektj1#
Kafka它被用作流媒体源,流媒体没有终点。
如果我没搞错的话,flink的kafka连接器每x毫秒从一个主题中提取一次数据,因为所有kafka消费者都是活动消费者,所以kafka不会通知您主题中是否有新数据
所以,在您的例子中,只要设置一个超时,如果您在这个时间内没有读取数据,那么您已经读取了主题中的所有数据。
不管怎样,如果你需要读取一批有限的数据,你可以使用flink的一些窗口或者在你的Kafka主题中引入一些标记,来界定这批数据的开始和结束。
njthzxwz2#
由于Kafka通常用于连续的数据流,所以使用一个主题的“全部”可能是一个有意义的概念,也可能不是。我建议你看看Flink是如何揭露Kafka标准的文档,其中包括以下解释:
所以,如果消费者滞后为零,你就被赶上了。也就是说,您可能希望自己能够比较偏移量,但我不知道有什么简单的方法可以做到这一点。