我正在测试kafka producer中的async send()。我要连接到的群集处于脱机状态。我的假设是我可以快速发送10000个请求(listtosend的长度)。下一个超时(60秒)将开始,60秒后我会看到回调击中我 logger.error(s"failed to send record ${x._2}", e)
然而,这个方法似乎要花很长时间才能完成。
这就是为什么我在 logger.debug("test: am I sending data")
线路。
它打印出来,然后在60秒内什么也没发生。我看到第一条记录的回调失败。只有这样,它才会继续前进。
这是正常的行为还是我遗漏了一些基本的东西?
listToSend.foreach { x =>
logger.debug("test: am I sending data")
// note: I added this 'val future =' in an attempt to fix this, to no avail
val future = producer.send(new ProducerRecord[String, String](topic, x._2), new Callback {
override def onCompletion(metadata: RecordMetadata, e: Exception) {
if (e != null) {
//todo: handle failed sends, timeouts, ...
logger.error(s"failed to send record ${x._2}", e)
}
else { //nice to have: implement logic here, or call another method to process metadata
logger.debug("~Callback success~")
}
}
}
)
}
注意:我不想阻止这段代码,我想保持它的异步。但是它似乎在send()上被阻塞。
1条答案
按热度按时间xriantvc1#
我从未完全弄清楚的平行性。
然而,似乎我的主题名(我把它命名为“[projectname here]\u connection”)是问题所在。
即使我不知道在主题名中有任何保留关键字,这种行为还是会突然出现。
一些进一步的实验还发现,带有尾随空格的主题名也会导致这种行为。制作人将尝试将其发送到此主题,但kafka集群似乎不知道如何处理它,从而导致这些超时。
因此,对于所有遇到此问题的人,请在继续进行故障排除之前检查/更改主题名称。