我有一个用例,其中最重要的一点是在所有的消费者记录 KafkaConsumer 已被提取。在这个用例中,不会有任何东西进入管道。怎样才能确保没有什么东西可以拿呢?
KafkaConsumer
dtcbnfnu1#
如果您正在使用 kafka-console-consumer ,您可以指定 timeout-ms 参数来定义它将等待多长时间,直到它被认为不再有消息到来。
kafka-console-consumer
timeout-ms
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval.
uqjltbpv2#
Kafka被设计来处理无限的数据流,所以“全部消费”只意味着在一段时间内(1分钟),1小时等没有人发送任何数据-这取决于你。您可以使用(伪代码):
int emptyCount = 0; while (true) { records = Consumer.poll(500); if (records.empty()) { emptyCount++; if (emptyCount >= 100) { break; } continue; } emptyCount = 0; ...process records... }
您可以调整poll中的超时和空周期数,以达到必要的等待时间。
2条答案
按热度按时间dtcbnfnu1#
如果您正在使用
kafka-console-consumer
,您可以指定timeout-ms
参数来定义它将等待多长时间,直到它被认为不再有消息到来。uqjltbpv2#
Kafka被设计来处理无限的数据流,所以“全部消费”只意味着在一段时间内(1分钟),1小时等没有人发送任何数据-这取决于你。
您可以使用(伪代码):
您可以调整poll中的超时和空周期数,以达到必要的等待时间。