消耗到空

cld4siwp  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(356)

我有一个用例,其中最重要的一点是在所有的消费者记录 KafkaConsumer 已被提取。在这个用例中,不会有任何东西进入管道。怎样才能确保没有什么东西可以拿呢?

dtcbnfnu

dtcbnfnu1#

如果您正在使用 kafka-console-consumer ,您可以指定 timeout-ms 参数来定义它将等待多长时间,直到它被认为不再有消息到来。

--timeout-ms <Integer: timeout_ms>      If specified, exit if no message is    
                                          available for consumption for the    
                                          specified interval.
uqjltbpv

uqjltbpv2#

Kafka被设计来处理无限的数据流,所以“全部消费”只意味着在一段时间内(1分钟),1小时等没有人发送任何数据-这取决于你。
您可以使用(伪代码):

int emptyCount = 0;
while (true) {
   records = Consumer.poll(500);
   if (records.empty()) {
      emptyCount++;
      if (emptyCount >= 100) {
         break;
      }
      continue;
   }
   emptyCount = 0;
   ...process records...
}

您可以调整poll中的超时和空周期数,以达到必要的等待时间。

相关问题