尝试使用下面提到的createtopics函数创建主题时,我在多节点kafka集群中遇到了“复制因子:1大于可用代理:0”。我有3个kafka代理,我试图为每个主题创建2个分区,并将复制因子保持为1。不知道为什么会出现这个错误。在单节点设置(1个zookeeper和1个kafka代理)中也可以很好地工作。
任何帮助都将不胜感激/
错误:
kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
at io.confluent.examples.producer.ZookeeperUtil.createTopics(ZookeeperUtil.java:98)
at io.confluent.examples.producer.ProducerGroup.<init>(ProducerGroup.java:50)
at io.confluent.examples.producer.ProducerGroup.main(ProducerGroup.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:745)
private static final int DEFAULT_SESSION_TIMEOUT = 10 * 1000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 8 * 1000;
private static final String ZOOKEEPER_CONNECT = "zNode01:2181,zNode02:2181,zNode03:2181";
/**
* Opens a new ZooKeeper client to access the Kafka broker.
*/
private static ZkClient connectToZookeeper ()
{
return new ZkClient(ZOOKEEPER_CONNECT,
DEFAULT_SESSION_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT,
ZKStringSerializer$.MODULE$);
}
/**
* Given a ZooKeeper client instance, accesses the broker and returns
* information about Kafka's contents.
*
* @param zookeeperClient A ZooKeeper client to access broker information
* through.
*/
private static ZkUtils zookeeperUtility (ZkClient zookeeperClient)
{
boolean isSecureCluster = false;
return new ZkUtils(zookeeperClient,
new ZkConnection(ZOOKEEPER_CONNECT),
isSecureCluster);
}
public static void createTopics (ArrayList<String> names, int partitions, int replication)
{
ZkClient zkClient = connectToZookeeper();
ZkUtils zkUtils = zookeeperUtility(zkClient);
try{
for (String name: names)
{
if (existsTopic(name))
continue;
AdminUtils.createTopic(zkUtils, name, partitions, replication, new Properties(),RackAwareMode.Disabled$.MODULE$);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
我按照以下说明设置了一个多节点kafka集群。
设置多节点apachezookeeper集群
在集群的每个节点上,向kafka/config/zookeeper.properties文件添加以下行
server.1=zNode01:2888:3888
server.2=zNode02:2888:3888
server.3=zNode03:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
在集群的每个节点上,在datadir属性表示的文件夹中创建一个名为myid的文件(默认情况下,文件夹是/tmp/zookeeper)。myid文件应该只包含znode的id(“1”表示znode01,“2”表示znode02,等等…)
设置多代理apache kafka群集
在集群的每个节点上,修改kafka/config/server.properties文件中的属性zookeeper.connect:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
在集群的每个节点上,修改kafka/config/server.properties文件中的属性host.name:host.name=znode0x
在集群的每个节点上修改kafka/config/server.properties文件中的property broker.id(集群中的每个代理都应该有一个唯一的id)
暂无答案!
目前还没有任何答案,快来回答吧!