我正在构建一个java8应用程序,它在kafka主题中只查询一条消息。每个请求创建一个新的 Consumer
对象(独立于任何现有 Consumer
对象),它轮询我的Kafka主题,得到一条记录,然后 Consumer
已关闭。这种情况每天发生约20万次,每个请求都独立于所有其他请求,因此我不认为我可以重用消费者。基本上,用户从主题请求一条消息,然后为其创建一个消费者,然后关闭。这种情况平均每秒发生约2次,但是任意的,因此可能发生10次/秒或1次/小时,没有办法知道。
过了一段时间,kafka服务器(不是运行代码的服务器,而是实际运行kafka的服务器)上的堆大小变得很大,垃圾收集无法清除它。最终,gc占用的cpu时间比其他任何东西都多,在我重新启动kafka之前,一切都崩溃了。
下面是导致问题的代码的近似版本,带有 while(true)
近似真实行为(在生产中,消费者不是在while循环中创建的,而是在用户请求来自主题的消息时按需创建的):
Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);
while(true){
Consumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
// I've narrowed down the memory leak to this line
ConsumerRecords<String, String> cr = consumer.poll(1000);
// If I remove this line ^, the memory leak does not happen
/* CODE TO GET ONE RECORD */
consumer.unsubscribe();
consumer.close();
}
在20个JVM上运行此代码会在大约20分钟内导致内存泄漏
我做错什么了吗(或者有更好的方法来解决这个问题),或者这是Kafka的一个错误,当很多消费者被创建和关闭的时候?
我在客户端运行kafka 0.10.2.1,在服务器端运行kafka 0.10.2.0。
3条答案
按热度按时间6rqinv9w1#
kafka 2.4.0(可能是以前的版本)存在资源泄漏,其中一些MBean没有在consumer.close()上取消注册。
这可能是你最初问这个问题时的情况,当然托尼在上面对你的问题的评论中提到了这一点。
https://issues.apache.org/jira/browse/kafka-9504?jql=project%20%3d%20kafka%20and%20text%20~%20%22用户%20泄漏%22
lhcgjxsq2#
您每天轮询kafka约20万次,即每小时约8千次/分钟约140次/秒约两次-为什么每次都要创建(并关闭)一个新的consumer示例?只需安排
KafkaConsumer
根据所需的时间间隔触发(可以使用jdkScheduledExecutorService
为此)并重用相同的使用者示例mfpqipee3#
不管您收到的请求的数量和频率如何,您仍然可以重用kafkaconsumer示例。您只能在请求到达时进行轮询,但不需要每次都创建和关闭使用者。
话虽如此,如果内存使用量增加并且gcs没有回收,那么您对消费者的使用可能已经揭示了代理上的内存管理问题。我看到过这样的问题:当生产者被频繁地回收利用时,代理耗尽了直接内存。所以很可能还有改进的空间。或许最好在issues.apache.org上提交一份清单,让人看看。