kafka:删除空闲消费者组id

vdzxcuhz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(614)

在某些情况下,我使用kafka流对主题的小内存(hashmap)投影进行建模。k,v缓存确实需要一些操作,因此它不是globalktable的好例子。在这种“缓存”场景中,我希望我的所有兄弟示例都具有相同的缓存,因此需要绕过使用者组机制。
为了实现这一点,我通常只需使用随机生成的应用程序id启动我的应用程序,这样每次应用程序重新启动时都会重新加载主题。唯一需要注意的是,我最终发现一些消费者群体孤立于kafka代理上,一直到offset.retention.minutes,这对于我们的运营监控工具来说并不理想。你知道怎么解决这个问题吗?
我们是否可以将applicationid配置为昙花一现,以便在应用程序死亡后它消失?
或者我们可以强制消费者只在本地管理其补偿吗?
或者,在优雅地关闭应用程序时,是否有一些javaadminapi可以用来清理我的消费者组id?
谢谢

9jyewag0

9jyewag01#

系统中有一个javaapi AdminClient 打电话 deleteConsumerGroups 可用于删除单个消费者组。
您可以在Kafka2.5.0中使用它,如下所示。

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

public class DeleteConsumerGroups {
  public static void main(String[] args) {
    System.out.println("***Starting AdminClient to delete a Consumer Group***");

    final Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
    properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

    AdminClient adminClient = AdminClient.create(properties);
    String consumerGroupToBeDeleted = "console-consumer-65092";
    DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));

    KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
    try {
      resultFuture.get();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

    adminClient.close();
  }
}

在运行上面的代码之前,先列出consumergroups

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-65092
console-consumer-53268

运行上述代码后的consumergroups列表

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-53268

相关问题