当kafka消费者投票返回空记录时?

0pizxfdo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(507)

如下图所示,我的代码是一个高级使用者,在kafka服务器中获取一个有32个分区的主题,我不明白为什么有时我会从consumer.poll()得到一个空返回。我尝试增加轮询超时,然后当我将超时增加到1000时,每个轮询都有返回数据,而我将超时设置为10或0,然后我看到很多空返回。
谁能告诉我如何设置正确的超时时间?

def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka-01:9098")
    props.put("group.id", "kch1")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    //props.put("max.poll.records", "1000")
    val consumers = new Array[KafkaConsumer[String, String]](16)
    for(i <- 0 to 15) {
      consumers(i) = new KafkaConsumer[String, String](props)
      consumers(i).subscribe(util.Arrays.asList("veh321"))
    }
    var cnt = 0
    var cacheIterator: Iterator[ConsumerRecord[String, String]] = null
    for(i <- 0 to 15) {
      new Thread(new Runnable {
        override def run(): Unit = {
          var finish = false
          while(!finish) {
            val start = System.currentTimeMillis()
            cacheIterator = consumers(i).poll(100).iterator()
            val end = System.currentTimeMillis() - start
            if (end > 10 ) {
              println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}")
            }
          }
        }
      }).start()
    }
2ic8powd

2ic8powd1#

java使用者通过调用java.nio.channels.selector.select(超时)将linux的epoll作为底层实现方案。如果您只给它100毫秒的时间来尝试在短时间间隔内准备了多少个selectionkeys,它很可能什么都不返回。
此外,在相同的100ms内,消费者还需要做一些其他的工作,包括轮询协调器状态,因此记录轮询的实时间隔明显小于100ms,这使得检索一些真正有用的东西变得更加困难。

相关问题