我们有一个前端层,它只接收消息并写入kafka主题以进行后端处理。我们以很高的速率发送信息;我们每天处理10亿条信息。我们有一个线程池,它接受消息并写入kafka producer示例。在这里,我只创建了一个生产者(单个示例),它在多个线程之间共享。
最近,我观察到90%的线程处于阻塞状态。我发现Kafka是按顺序发送数据的。系统中有一个同步块 producer.send()
kafka java驱动程序中的方法:
def send(messages: KeyedMessage[K,V]*) {
**lock synchronized {**
if (hasShutdown.get)
throw new ProducerClosedException
recordStats(messages)
sync match {
case true => eventHandler.handle(messages)
case false => asyncSend(messages)
}
}
}
文档说我们不需要创建多个producer示例;一个示例可以在多线程环境中共享。但我们怎么做呢?或者我们最好创建一个生产者示例池?
1条答案
按热度按时间tyg4sfes1#
之所以建议跨线程共享发布服务器客户端,是因为这样可以更好地批处理消息,因为消息是在分区级别批处理的。更好的批处理可以带来更好的压缩(如果启用)和更好的吞吐量。您可以考虑调整参数,如buffer memory和linger.ms以及批处理大小,以优化吞吐量。一个这样做,然后您可以考虑添加多个生产者。另外,如果主题的传入率很高,请考虑增加该主题的分区数。