kafka1.0:使用java更改特定主题的分区数

mnemlml8  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(319)

kafka 1.0中的adminutils.addpartition调用似乎已将签名从2版本-0.10.2.0更改为1.0.0,如下所示

Kafka0.10.2.0

/**
  * Add partitions to existing topic with optional replica assignment
  *
  * @param zkUtils Zookeeper utilities
  * @param topic Topic for adding partitions to
  * @param numPartitions Number of partitions to be set
  * @param replicaAssignmentStr Manual replica assignment
  * @param checkBrokerAvailable Ignore checking if assigned replica broker             is available. Only used for testing
   */
   def addPartitions(zkUtils: ZkUtils,
                     topic: String,
                     numPartitions: Int = 1,
                     replicaAssignmentStr: String = "",
                     checkBrokerAvailable: Boolean = true,
                     rackAwareMode: RackAwareMode = RackAwareMode.Enforced)

==========================

Kafka1.0.0

/**
   * Add partitions to existing topic with optional replica assignment
   *
   * @param zkUtils Zookeeper utilities
   * @param topic Topic for adding partitions to
   * @param existingAssignment A map from partition id to its assigned      replicas
   * @param allBrokers All brokers in the cluster
   * @param numPartitions Number of partitions to be set
   * @param replicaAssignment Manual replica assignment, or none
   * @param validateOnly If true, validate the parameters without actually      adding the partitions
   * @return the updated replica assignment
   */
   def addPartitions(zkUtils: ZkUtils,
                     topic: String,
                     existingAssignment: Map[Int, Seq[Int]],
                     allBrokers: Seq[BrokerMetadata],
                     numPartitions: Int = 1,
                     replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
                     validateOnly: Boolean = false): Map[Int, Seq[Int]] =

关于如何在java中轻松使用新的addpartitions调用而不必指定现有和新的副本分配,有什么想法吗?

70gysomp

70gysomp1#

试试这个,对我有用

Seq<String> names = JavaConverters.asScalaBuffer(Arrays.asList(topicname)).toSeq();
        Seq<BrokerMetadata> brokers = AdminUtils.getBrokerMetadatas(zkUtils, Enforced$.MODULE$, Option.empty());

        Map<String, Map<Object, Seq<Object>>> assignment
            = zkUtils.getPartitionAssignmentForTopics(names);
        Option<Map<Object, Seq<Object>>> assgn = assignment.get(topicname);

        AdminUtils.addPartitions(zkUtils, topicname, assgn.get(), brokers, partitions, Option.empty(), false);
wn9m85ua

wn9m85ua2#

@穆尼什-谢谢你的回复。这是对我有用的代码。

Seq<String> names = JavaConverters.asScalaBuffer(Arrays.asList(topicName));

Seq<BrokerMetadata> brokers = AdminUtils.getBrokerMetadatas(zkUtils,RackAwareMode.Enforced$.MODULE$, Option.empty());

scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>> assignment = (scala.collection.mutable.Map<String,     scala.collection.Map<Object, Seq<Object>>>)zkUtils.getPartitionAssignmentForTopics(names);

Map<String, scala.collection.Map<Object, Seq<Object>>> partitionaAssigmentMap = JavaConverters.mutableMapAsJavaMap(assignment);

AdminUtils.addPartitions(zkUtils, topicName, partitionaAssigmentMap.get(topicName), brokers,partitionCount, Option.empty(), false);
bvhaajcl

bvhaajcl3#

从1.0.0开始,您可以改用kafka客户机库

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version> <!-- or newer -->
</dependency>

在java代码中,它看起来像:

NewPartition newPartitionRequest = NewPartitions.increaseTo(10);
client.createPartitions(Collections.singletonMap("topicName", newPartitionRequest)).all().get();  //kafka clients AdminClient

相关问题