我怎么知道我已经消耗了所有的Kafka主题?

t1rydlwq  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(317)

我正在使用flink v1.4.0。我正在使用来自 Kafka 主题使用 Kafka FLink Consumer 按照以下代码:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...

有没有办法知道我是否把整个主题都看完了?如何监视偏移(这是一个充分的方式来确认我已经消耗了所有的数据从内部 Kafka 主题?)

hpxqektj

hpxqektj1#

Kafka它被用作流媒体源,流媒体没有终点。
如果我没搞错的话,flink的kafka连接器每x毫秒从一个主题中提取一次数据,因为所有kafka消费者都是活动消费者,所以kafka不会通知您主题中是否有新数据
所以,在您的例子中,只要设置一个超时,如果您在这个时间内没有读取数据,那么您已经读取了主题中的所有数据。
不管怎样,如果你需要读取一批有限的数据,你可以使用flink的一些窗口或者在你的Kafka主题中引入一些标记,来界定这批数据的开始和结束。

njthzxwz

njthzxwz2#

由于Kafka通常用于连续的数据流,所以使用一个主题的“全部”可能是一个有意义的概念,也可能不是。我建议你看看Flink是如何揭露Kafka标准的文档,其中包括以下解释:

The difference between the committed offset and the most recent offset in 
each partition is called the consumer lag. If the Flink topology is consuming 
the data slower from the topic than new data is added, the lag will increase 
and the consumer will fall behind. For large production deployments we 
recommend monitoring that metric to avoid increasing latency.

所以,如果消费者滞后为零,你就被赶上了。也就是说,您可能希望自己能够比较偏移量,但我不知道有什么简单的方法可以做到这一点。

相关问题