adminutils.createtopic api能否连接到多个zookeeper节点?

1zmg4dgp  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(303)

尝试使用下面提到的createtopics函数创建主题时,我在多节点kafka集群中遇到了“复制因子:1大于可用代理:0”。我有3个kafka代理,我试图为每个主题创建2个分区,并将复制因子保持为1。不知道为什么会出现这个错误。在单节点设置(1个zookeeper和1个kafka代理)中也可以很好地工作。
任何帮助都将不胜感激/
错误:

kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
at io.confluent.examples.producer.ZookeeperUtil.createTopics(ZookeeperUtil.java:98)
at io.confluent.examples.producer.ProducerGroup.<init>(ProducerGroup.java:50)
at io.confluent.examples.producer.ProducerGroup.main(ProducerGroup.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:745)

private static final int DEFAULT_SESSION_TIMEOUT = 10 * 1000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 8 * 1000;
private static final String ZOOKEEPER_CONNECT = "zNode01:2181,zNode02:2181,zNode03:2181";

/**
 * Opens a new ZooKeeper client to access the Kafka broker.
 */
private static ZkClient connectToZookeeper ()
{
    return new ZkClient(ZOOKEEPER_CONNECT,
                        DEFAULT_SESSION_TIMEOUT,
                        DEFAULT_CONNECTION_TIMEOUT,
                        ZKStringSerializer$.MODULE$);
}

/**
 * Given a ZooKeeper client instance, accesses the broker and returns
 * information about Kafka's contents.
 *
 * @param zookeeperClient A ZooKeeper client to access broker information
 *                        through.
 */
private static ZkUtils zookeeperUtility (ZkClient zookeeperClient)
{
    boolean isSecureCluster = false;
    return new ZkUtils(zookeeperClient, 
                       new ZkConnection(ZOOKEEPER_CONNECT),
                       isSecureCluster);
}

public static void createTopics (ArrayList<String> names, int partitions, int replication)
{
    ZkClient zkClient = connectToZookeeper();
    ZkUtils zkUtils = zookeeperUtility(zkClient);

try{
    for (String name: names)
    {
        if (existsTopic(name))
            continue;

        AdminUtils.createTopic(zkUtils, name, partitions, replication, new Properties(),RackAwareMode.Disabled$.MODULE$);

    }

} catch (Exception ex) {
    ex.printStackTrace();
} finally {
    if (zkClient != null) {
        zkClient.close();
    }
}
}

我按照以下说明设置了一个多节点kafka集群。
设置多节点apachezookeeper集群
在集群的每个节点上,向kafka/config/zookeeper.properties文件添加以下行

server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

在集群的每个节点上,在datadir属性表示的文件夹中创建一个名为myid的文件(默认情况下,文件夹是/tmp/zookeeper)。myid文件应该只包含znode的id(“1”表示znode01,“2”表示znode02,等等…)
设置多代理apache kafka群集
在集群的每个节点上,修改kafka/config/server.properties文件中的属性zookeeper.connect:

zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

在集群的每个节点上,修改kafka/config/server.properties文件中的属性host.name:host.name=znode0x
在集群的每个节点上修改kafka/config/server.properties文件中的property broker.id(集群中的每个代理都应该有一个唯一的id)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题