adminutils.createtopic api能否连接到多个zookeeper节点?

1zmg4dgp  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(379)

尝试使用下面提到的createtopics函数创建主题时,我在多节点kafka集群中遇到了“复制因子:1大于可用代理:0”。我有3个kafka代理,我试图为每个主题创建2个分区,并将复制因子保持为1。不知道为什么会出现这个错误。在单节点设置(1个zookeeper和1个kafka代理)中也可以很好地工作。
任何帮助都将不胜感激/
错误:

  1. kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
  2. at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
  3. at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
  4. at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
  5. at io.confluent.examples.producer.ZookeeperUtil.createTopics(ZookeeperUtil.java:98)
  6. at io.confluent.examples.producer.ProducerGroup.<init>(ProducerGroup.java:50)
  7. at io.confluent.examples.producer.ProducerGroup.main(ProducerGroup.java:124)
  8. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  9. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  10. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  11. at java.lang.reflect.Method.invoke(Method.java:497)
  12. at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
  13. at java.lang.Thread.run(Thread.java:745)
  14. private static final int DEFAULT_SESSION_TIMEOUT = 10 * 1000;
  15. private static final int DEFAULT_CONNECTION_TIMEOUT = 8 * 1000;
  16. private static final String ZOOKEEPER_CONNECT = "zNode01:2181,zNode02:2181,zNode03:2181";
  17. /**
  18. * Opens a new ZooKeeper client to access the Kafka broker.
  19. */
  20. private static ZkClient connectToZookeeper ()
  21. {
  22. return new ZkClient(ZOOKEEPER_CONNECT,
  23. DEFAULT_SESSION_TIMEOUT,
  24. DEFAULT_CONNECTION_TIMEOUT,
  25. ZKStringSerializer$.MODULE$);
  26. }
  27. /**
  28. * Given a ZooKeeper client instance, accesses the broker and returns
  29. * information about Kafka's contents.
  30. *
  31. * @param zookeeperClient A ZooKeeper client to access broker information
  32. * through.
  33. */
  34. private static ZkUtils zookeeperUtility (ZkClient zookeeperClient)
  35. {
  36. boolean isSecureCluster = false;
  37. return new ZkUtils(zookeeperClient,
  38. new ZkConnection(ZOOKEEPER_CONNECT),
  39. isSecureCluster);
  40. }
  41. public static void createTopics (ArrayList<String> names, int partitions, int replication)
  42. {
  43. ZkClient zkClient = connectToZookeeper();
  44. ZkUtils zkUtils = zookeeperUtility(zkClient);
  45. try{
  46. for (String name: names)
  47. {
  48. if (existsTopic(name))
  49. continue;
  50. AdminUtils.createTopic(zkUtils, name, partitions, replication, new Properties(),RackAwareMode.Disabled$.MODULE$);
  51. }
  52. } catch (Exception ex) {
  53. ex.printStackTrace();
  54. } finally {
  55. if (zkClient != null) {
  56. zkClient.close();
  57. }
  58. }
  59. }

我按照以下说明设置了一个多节点kafka集群。
设置多节点apachezookeeper集群
在集群的每个节点上,向kafka/config/zookeeper.properties文件添加以下行

  1. server.1=zNode01:2888:3888
  2. server.2=zNode02:2888:3888
  3. server.3=zNode03:2888:3888
  4. #add here more servers if you want
  5. initLimit=5
  6. syncLimit=2

在集群的每个节点上,在datadir属性表示的文件夹中创建一个名为myid的文件(默认情况下,文件夹是/tmp/zookeeper)。myid文件应该只包含znode的id(“1”表示znode01,“2”表示znode02,等等…)
设置多代理apache kafka群集
在集群的每个节点上,修改kafka/config/server.properties文件中的属性zookeeper.connect:

  1. zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

在集群的每个节点上,修改kafka/config/server.properties文件中的属性host.name:host.name=znode0x
在集群的每个节点上修改kafka/config/server.properties文件中的property broker.id(集群中的每个代理都应该有一个唯一的id)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题