我的Kafka有数据积累问题。排除故障后,我发现数据消耗非常耗时一次大约10分钟。我在这里找到了具体的代码。
while (consumerRecords.hasNext()) {
long begin = System.currentTimeMillis();
ConsumerRecord<String, Message> consumerRecord = consumerRecords.next();
long next = System.currentTimeMillis() - begin ;
....
consumerrecords对象的类型是kafkardd,next()方法用了大约40秒返回数据。这导致了数据积累。
这是我的监控信息
2020/10/19 18:03:44.000 7条记录 40分钟 0.4秒 40分钟
2020/10/19 18:03:43.500 2条记录 40分钟 0.4秒 40分钟
2020/10/19 18:03:43.000 7条记录 39分钟 40秒 40分钟
2020/10/19 18:03:42.500 2条记录 39分钟 0.4秒 39分钟
2020/10/19 18:03:42.000 8条记录 39分钟 0.4秒 39分钟
我不知道如何继续解决这个问题,或者是什么原因导致它如此耗时?
请给我一些指导和建议,谢谢
暂无答案!
目前还没有任何答案,快来回答吧!