我使用的是kafka 0.10.2,为了接收来自kafka的记录,我有一个如下的消费者投票循环:
while (true) {
ConsumerRecords<Long, String> records = consumer.poll(2000);
int count= 0;
long lasttimestamp = 0;
long lastOffset = 0;
for (ConsumerRecord<Long, String> record : records) {
if (record.timestamp() >= end_time_Stamp) {
reachedEnd = true;
break;
}
result.add(record);
}
if (reachedEnd) break;
if (records == null || records.isEmpty()) break; // dont wait for records
}
这里在轮询循环之前,我们通过使用“offsetsfortimes”api查找偏移量来寻求开始时间戳。我们把记录拿到最后。
使用consumer.poll api获取记录。如何知道消费者轮询超时应传递的值是多少?。目前,我们只是通过试错来做,看看哪一个会起作用。我想应该有更好的办法。
问题:
如何知道可以给consumer.poll api的理想超时值是多少?这取决于什么?它应该作为运行时参数吗?
有时需要的超时值更大。什么会导致所需超时突然激增(如果kafka中的摄取率过高,是否会影响所需的消费者轮询超时配置?)
如何放弃?当没有记录时,我们应该跳出循环。如何可靠地知道如何不过早地跳出循环?
1条答案
按热度按时间mbskvtky1#
轮询超时取决于您的应用程序—如果可以的话,您可以等待更长时间的数据,但是如果您需要在此期间执行某些操作,那么等待太久是没有意义的
这可能是多种原因造成的,包括重新平衡等。
我建议不要在记录列表为空的第一刻就中断,而是将poll定义为1000ms,然后直接计算记录列表为空的次数,如果为空则中断10次(10s)或类似的情况: