kafka服务器配置为端口号后面的路径(从server.properties)
zookeeper.connect=xxxxx007:2181/kafka
java生产者代码:
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "xxxxx007:9092");
如果代理省略/kafka,则生产者填充主题
当代理包含/kafa时,生产者获得numberformatexception
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "xxxxx007:9092/kafka");
java.lang.NumberFormatException: For input string: "9092/kafka"
如果zookeeper连接包含/kafka,java使用者将挂起(不返回任何数据)
Properties props = new Properties();
props.put("zookeeper.connect", "xxxxx007:2181/kafka");
如果zookeeper连接省略/kakfa,java使用者将获得异常
Properties props = new Properties();
props.put("zookeeper.connect", "xxxxx007:2181");
Exception in thread "main" kafka.common.ConsumerRebalanceFailedException: group1_BFTSLBHW0000RGU-1397591737558-f75b6658 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:209)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:92)
at kafka.examples.TitaniumConsumer.main(TitaniumConsumer.java:73)
1条答案
按热度按时间blpfk2vs1#
指定这个zookeeper路径的意图是,对于一个特定的集群,kafka中所有可用的数据都出现在这个特定的路径下。
请注意,您必须在启动代理之前自己创建此路径,并且使用者必须使用相同的连接字符串。