kafka生产者在回调时阻塞

s4chpxco  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(598)

我正在测试kafka producer中的async send()。我要连接到的群集处于脱机状态。我的假设是我可以快速发送10000个请求(listtosend的长度)。下一个超时(60秒)将开始,60秒后我会看到回调击中我 logger.error(s"failed to send record ${x._2}", e) 然而,这个方法似乎要花很长时间才能完成。
这就是为什么我在 logger.debug("test: am I sending data") 线路。
它打印出来,然后在60秒内什么也没发生。我看到第一条记录的回调失败。只有这样,它才会继续前进。
这是正常的行为还是我遗漏了一些基本的东西?

listToSend.foreach { x =>
        logger.debug("test: am I sending data")
        // note: I added this 'val future =' in an attempt to fix this, to no avail
        val future = producer.send(new ProducerRecord[String, String](topic, x._2), new Callback {
          override def onCompletion(metadata: RecordMetadata, e: Exception) {

            if (e != null) {
              //todo: handle failed sends, timeouts, ...
              logger.error(s"failed to send record ${x._2}", e)
            }
            else { //nice to have: implement logic here, or call another method to process metadata
              logger.debug("~Callback success~")
            }
          }
        }
        )
      }

注意:我不想阻止这段代码,我想保持它的异步。但是它似乎在send()上被阻塞。

xriantvc

xriantvc1#

我从未完全弄清楚的平行性。
然而,似乎我的主题名(我把它命名为“[projectname here]\u connection”)是问题所在。
即使我不知道在主题名中有任何保留关键字,这种行为还是会突然出现。
一些进一步的实验还发现,带有尾随空格的主题名也会导致这种行为。制作人将尝试将其发送到此主题,但kafka集群似乎不知道如何处理它,从而导致这些超时。
因此,对于所有遇到此问题的人,请在继续进行故障排除之前检查/更改主题名称。

相关问题