通过javaapi获取和更新kafka主题配置

mwkjh3gx  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(359)

我正在编写一个应用程序,通过javaapi对kafka主题执行一系列操作。我可以创建主题和添加分区。我需要关于获取主题元数据(如分区、代理)和配置以及更新配置的帮助。
为了便于参考,我想更新这里提供的主题级配置-https://kafka.apache.org/documentation#configuration 例如cleanup.policy、compression.type等

2mbi3lxu

2mbi3lxu1#

将我的答案与一个类似的问题联系起来,这是如何获取java主题的配置。apachekafka客户端(java):列出主题并检查主题是否被日志压缩
要点是你需要创建一个 AdminClient ,然后是 ConfigResource 属性,调用 describeConfigs 方法并通过 DescribeConfigsResult 查找您要查找的特定配置。
这里是代码,更多的解释是在我的答案在上面的链接。

import org.apache.kafka.clients.admin.AdminClient;
import java.util.*;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.config.ConfigResource;

admin = AdminClient.create(properties);

Collection<ConfigResource> cr =  Collections.singleton( new ConfigResource(ConfigResource.Type.TOPIC, "<Your Topic Name>") 

DescribeConfigsResult ConfigsResult = admin.describeConfigs(cr));
Config all_configs = (Config)ConfigsResult.all().get().values().toArray()[0];

  Iterator ConfigIterator = all_configs.entries().iterator();

    while (ConfigIterator.hasNext()) 
    {
          ConfigEntry currentConfig = (ConfigEntry) ConfigIterator.next();
          if (currentConfig.name().equals("cleanup.policy")) {
                System.out.println(currentConfig.value());
          }
        }
vlju58qv

vlju58qv2#

您可以使用下面的代码打印主题级配置。更新配置的用法类似。
string[]args={--zookeeper“,”localhost:2181“,”--实体类型“,”主题“,”实体名称“,”测试“,”描述“};
configcommand.main(args);
至于获取元数据,请参阅中查找主题和分区的主代理https://cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example:
添加:使用adminutils添加配置获取和更新示例:

ZkUtils zkUtils = ZkUtils.apply("localhost:2181/k1", 6000, 10000, JaasUtils.isZkSecurityEnabled());

    Properties pp = new Properties();
    pp.setProperty("delete.retention.ms", "3000000");
    pp.setProperty("file.delete.delay.ms", "40000");
    AdminUtils.changeTopicConfig(zkUtils, "test", pp);
    Properties p = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");
    System.out.println(p);

相关问题