kafka代理内存泄漏由许多使用者触发

v440hwme  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(477)

我正在构建一个java8应用程序,它在kafka主题中只查询一条消息。每个请求创建一个新的 Consumer 对象(独立于任何现有 Consumer 对象),它轮询我的Kafka主题,得到一条记录,然后 Consumer 已关闭。这种情况每天发生约20万次,每个请求都独立于所有其他请求,因此我不认为我可以重用消费者。基本上,用户从主题请求一条消息,然后为其创建一个消费者,然后关闭。这种情况平均每秒发生约2次,但是任意的,因此可能发生10次/秒或1次/小时,没有办法知道。
过了一段时间,kafka服务器(不是运行代码的服务器,而是实际运行kafka的服务器)上的堆大小变得很大,垃圾收集无法清除它。最终,gc占用的cpu时间比其他任何东西都多,在我重新启动kafka之前,一切都崩溃了。
下面是导致问题的代码的近似版本,带有 while(true) 近似真实行为(在生产中,消费者不是在while循环中创建的,而是在用户请求来自主题的消息时按需创建的):

Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);

while(true){
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    TopicPartition tp = new TopicPartition("TOPIC", 0);
    consumer.assign(Arrays.asList(tp));
    consumer.seekToEnd(Arrays.asList(tp));

    // I've narrowed down the memory leak to this line
    ConsumerRecords<String, String> cr = consumer.poll(1000); 
    // If I remove this line ^, the memory leak does not happen

    /* CODE TO GET ONE RECORD */

    consumer.unsubscribe();
    consumer.close();
}

在20个JVM上运行此代码会在大约20分钟内导致内存泄漏

我做错什么了吗(或者有更好的方法来解决这个问题),或者这是Kafka的一个错误,当很多消费者被创建和关闭的时候?
我在客户端运行kafka 0.10.2.1,在服务器端运行kafka 0.10.2.0。

6rqinv9w

6rqinv9w1#

kafka 2.4.0(可能是以前的版本)存在资源泄漏,其中一些MBean没有在consumer.close()上取消注册。
这可能是你最初问这个问题时的情况,当然托尼在上面对你的问题的评论中提到了这一点。
https://issues.apache.org/jira/browse/kafka-9504?jql=project%20%3d%20kafka%20and%20text%20~%20%22用户%20泄漏%22

lhcgjxsq

lhcgjxsq2#

您每天轮询kafka约20万次,即每小时约8千次/分钟约140次/秒约两次-为什么每次都要创建(并关闭)一个新的consumer示例?只需安排 KafkaConsumer 根据所需的时间间隔触发(可以使用jdk ScheduledExecutorService 为此)并重用相同的使用者示例

mfpqipee

mfpqipee3#

不管您收到的请求的数量和频率如何,您仍然可以重用kafkaconsumer示例。您只能在请求到达时进行轮询,但不需要每次都创建和关闭使用者。
话虽如此,如果内存使用量增加并且gcs没有回收,那么您对消费者的使用可能已经揭示了代理上的内存管理问题。我看到过这样的问题:当生产者被频繁地回收利用时,代理耗尽了直接内存。所以很可能还有改进的空间。或许最好在issues.apache.org上提交一份清单,让人看看。

相关问题