kafka:如何给消费者投票超时值?

gfttwv5a  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(561)

我使用的是kafka 0.10.2,为了接收来自kafka的记录,我有一个如下的消费者投票循环:

while (true) {
    ConsumerRecords<Long, String> records = consumer.poll(2000);
    int count= 0;
    long lasttimestamp = 0;
    long lastOffset = 0;
    for (ConsumerRecord<Long, String> record : records) {
        if (record.timestamp() >= end_time_Stamp) {
            reachedEnd = true;
            break;
        }
        result.add(record);
    }

    if (reachedEnd) break;
    if (records == null || records.isEmpty()) break; // dont wait for records
}

这里在轮询循环之前,我们通过使用“offsetsfortimes”api查找偏移量来寻求开始时间戳。我们把记录拿到最后。
使用consumer.poll api获取记录。如何知道消费者轮询超时应传递的值是多少?。目前,我们只是通过试错来做,看看哪一个会起作用。我想应该有更好的办法。
问题:
如何知道可以给consumer.poll api的理想超时值是多少?这取决于什么?它应该作为运行时参数吗?
有时需要的超时值更大。什么会导致所需超时突然激增(如果kafka中的摄取率过高,是否会影响所需的消费者轮询超时配置?)
如何放弃?当没有记录时,我们应该跳出循环。如何可靠地知道如何不过早地跳出循环?

mbskvtky

mbskvtky1#

轮询超时取决于您的应用程序—如果可以的话,您可以等待更长时间的数据,但是如果您需要在此期间执行某些操作,那么等待太久是没有意义的
这可能是多种原因造成的,包括重新平衡等。
我建议不要在记录列表为空的第一刻就中断,而是将poll定义为1000ms,然后直接计算记录列表为空的次数,如果为空则中断10次(10s)或类似的情况:

int counter=0;
while(true) {
   consumer.poll(1000);
   for (...) {
   }
   if (records == null || records.isEmpty()) {
     counter++;
      if (counter > 10)
         break;
   } else {
     counter = 0;
   }
}

相关问题