kafka1.0:使用java更改特定主题的分区数

mnemlml8  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(333)

kafka 1.0中的adminutils.addpartition调用似乎已将签名从2版本-0.10.2.0更改为1.0.0,如下所示

Kafka0.10.2.0

  1. /**
  2. * Add partitions to existing topic with optional replica assignment
  3. *
  4. * @param zkUtils Zookeeper utilities
  5. * @param topic Topic for adding partitions to
  6. * @param numPartitions Number of partitions to be set
  7. * @param replicaAssignmentStr Manual replica assignment
  8. * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
  9. */
  10. def addPartitions(zkUtils: ZkUtils,
  11. topic: String,
  12. numPartitions: Int = 1,
  13. replicaAssignmentStr: String = "",
  14. checkBrokerAvailable: Boolean = true,
  15. rackAwareMode: RackAwareMode = RackAwareMode.Enforced)

==========================

Kafka1.0.0

  1. /**
  2. * Add partitions to existing topic with optional replica assignment
  3. *
  4. * @param zkUtils Zookeeper utilities
  5. * @param topic Topic for adding partitions to
  6. * @param existingAssignment A map from partition id to its assigned replicas
  7. * @param allBrokers All brokers in the cluster
  8. * @param numPartitions Number of partitions to be set
  9. * @param replicaAssignment Manual replica assignment, or none
  10. * @param validateOnly If true, validate the parameters without actually adding the partitions
  11. * @return the updated replica assignment
  12. */
  13. def addPartitions(zkUtils: ZkUtils,
  14. topic: String,
  15. existingAssignment: Map[Int, Seq[Int]],
  16. allBrokers: Seq[BrokerMetadata],
  17. numPartitions: Int = 1,
  18. replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
  19. validateOnly: Boolean = false): Map[Int, Seq[Int]] =

关于如何在java中轻松使用新的addpartitions调用而不必指定现有和新的副本分配,有什么想法吗?

70gysomp

70gysomp1#

试试这个,对我有用

  1. Seq<String> names = JavaConverters.asScalaBuffer(Arrays.asList(topicname)).toSeq();
  2. Seq<BrokerMetadata> brokers = AdminUtils.getBrokerMetadatas(zkUtils, Enforced$.MODULE$, Option.empty());
  3. Map<String, Map<Object, Seq<Object>>> assignment
  4. = zkUtils.getPartitionAssignmentForTopics(names);
  5. Option<Map<Object, Seq<Object>>> assgn = assignment.get(topicname);
  6. AdminUtils.addPartitions(zkUtils, topicname, assgn.get(), brokers, partitions, Option.empty(), false);
wn9m85ua

wn9m85ua2#

@穆尼什-谢谢你的回复。这是对我有用的代码。

  1. Seq<String> names = JavaConverters.asScalaBuffer(Arrays.asList(topicName));
  2. Seq<BrokerMetadata> brokers = AdminUtils.getBrokerMetadatas(zkUtils,RackAwareMode.Enforced$.MODULE$, Option.empty());
  3. scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>> assignment = (scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>>)zkUtils.getPartitionAssignmentForTopics(names);
  4. Map<String, scala.collection.Map<Object, Seq<Object>>> partitionaAssigmentMap = JavaConverters.mutableMapAsJavaMap(assignment);
  5. AdminUtils.addPartitions(zkUtils, topicName, partitionaAssigmentMap.get(topicName), brokers,partitionCount, Option.empty(), false);
bvhaajcl

bvhaajcl3#

从1.0.0开始,您可以改用kafka客户机库

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>1.0.0</version> <!-- or newer -->
  5. </dependency>

在java代码中,它看起来像:

  1. NewPartition newPartitionRequest = NewPartitions.increaseTo(10);
  2. client.createPartitions(Collections.singletonMap("topicName", newPartitionRequest)).all().get(); //kafka clients AdminClient

相关问题