我有一个Kafka消费者,我创建了一个时间表。它尝试使用自上次提交以来添加的所有新消息。
我想关闭消费者一旦消费的所有新消息在日志中,而不是无限期等待新消息进来。
我很难通过Kafka的文件找到解决办法。
我在confluent.kafka.consumerconfig和clientconfig类中看到了许多与超时相关的属性,包括fetchwaitmaxms,但无法解释要使用哪个。我正在使用.net客户端。
如有任何建议,将不胜感激。
我有一个Kafka消费者,我创建了一个时间表。它尝试使用自上次提交以来添加的所有新消息。
我想关闭消费者一旦消费的所有新消息在日志中,而不是无限期等待新消息进来。
我很难通过Kafka的文件找到解决办法。
我在confluent.kafka.consumerconfig和clientconfig类中看到了许多与超时相关的属性,包括fetchwaitmaxms,但无法解释要使用哪个。我正在使用.net客户端。
如有任何建议,将不胜感激。
2条答案
按热度按时间huus2vyu1#
我从未使用过.net客户机,但假设它与java客户机没有那么大的不同,那么
poll()
方法应接受以毫秒为单位的超时值,因此将其设置为5000
在大多数情况下都应该有效。不需要摆弄配置类。另一种方法是在创建消费者时找到最大偏移量,并且只读取到该偏移量。这将从理论上防止你的消费者无限期运行,如果有任何机会,它不是消费的速度,生产者生产。但我从未尝试过这种方法。
zfciruhq2#
我找到了解决办法。confluent的.net kafka库的1.0.0-beta2版本提供了一个名为
.Consume(TimeSpan timeSpan)
. 如果没有要使用的新消息,或者处于分区eof,则返回null。我以前用的是.Consume(CancellationToken cancellationToken)
过载阻塞并阻止我关闭消费者。更多信息:https://github.com/confluentinc/confluent-kafka-dotnet/issues/614#issuecomment-433848857另一个选择是升级到1.0.0-beta3版本,该版本在名为ispartitioneof的consumeresult对象上提供一个布尔标志。这就是我最初寻找的-一种知道何时到达分区末尾的方法。