我是新来Kafka和工作与新Kafka制作人和Kafka消费者,版本:0.9.0.1在java中,是否有任何方法可以在创建特定主题后更改/更新其分区数。我不是用zookeeper来创建主题。我的kafkaproducer在发布请求到达时自动创建主题。如果这些还不够,我还可以提供更多的细节
qc6wkl3g1#
是的,有可能。你必须进入 AdminUtils scala类 kafka_2.11-0.9.0.1.jar 添加分区。 AdminUtils 支持的分区数目在主题中只能增加。你可能需要 kafka_2.11-0.9.0.1.jar , zk-client-0.8.jar , scala-library-2.11.8.jar 以及 scala-parser-combinators_2.11-1.0.4.jar 类路径中的jar。下面代码的一部分是从kafka cloudera示例中借用/启发的。
AdminUtils
kafka_2.11-0.9.0.1.jar
zk-client-0.8.jar
scala-library-2.11.8.jar
scala-parser-combinators_2.11-1.0.4.jar
package org.apache.kafka.examples; import java.io.Closeable; import org.I0Itec.zkclient.ZkClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode.Enforced$; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; public class Test { static final Logger logger = LogManager.getLogger(); public Test() { // TODO Auto-generated constructor stub } public static void addPartitions(String zkServers, String topic, int partitions) { try (AutoZkClient zkClient = new AutoZkClient(zkServers)) { ZkUtils zkUtils = ZkUtils.apply(zkClient, false); if (AdminUtils.topicExists(zkUtils, topic)) { logger.info("Altering topic {}", topic); try { AdminUtils.addPartitions(zkUtils, topic, partitions, "", true, Enforced$.MODULE$); logger.info("Topic {} altered with partitions : {}", topic, partitions); } catch (AdminOperationException aoe) { logger.info("Error while altering partitions for topic : {}", topic, aoe); } } else { logger.info("Topic {} doesn't exists", topic); } } } // Just exists for Closeable convenience private static final class AutoZkClient extends ZkClient implements Closeable { static int sessionTimeout = 30_000; static int connectionTimeout = 6_000; AutoZkClient(String zkServers) { super(zkServers, sessionTimeout, connectionTimeout, ZKStringSerializer$.MODULE$); } } public static void main(String[] args) { addPartitions("localhost:2181", "hello", 20); } }
1条答案
按热度按时间qc6wkl3g1#
是的,有可能。你必须进入
AdminUtils
scala类kafka_2.11-0.9.0.1.jar
添加分区。AdminUtils
支持的分区数目在主题中只能增加。你可能需要kafka_2.11-0.9.0.1.jar
,zk-client-0.8.jar
,scala-library-2.11.8.jar
以及scala-parser-combinators_2.11-1.0.4.jar
类路径中的jar。下面代码的一部分是从kafka cloudera示例中借用/启发的。