kafka.utils.ZkUtils.getAllBrokersInCluster()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(106)

本文整理了Java中kafka.utils.ZkUtils.getAllBrokersInCluster()方法的一些代码示例,展示了ZkUtils.getAllBrokersInCluster()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getAllBrokersInCluster()方法的具体详情如下:
包路径:kafka.utils.ZkUtils
类名称:ZkUtils
方法名:getAllBrokersInCluster

ZkUtils.getAllBrokersInCluster介绍

暂无

代码示例

代码示例来源:origin: linkedin/cruise-control

private Set<Integer> aliveBrokers() {
 // We get the alive brokers from ZK directly.
 return JavaConversions.asJavaCollection(_zkUtils.getAllBrokersInCluster())
            .stream().map(Broker::id).collect(toSet());
}

代码示例来源:origin: linkedin/cruise-control

brokerSampleRetentionMs = Math.max(_minBrokerSampleStoreTopicRetentionTimeMs, brokerSampleRetentionMs);
int numberOfBrokersInCluster = zkUtils.getAllBrokersInCluster().size();
if (numberOfBrokersInCluster <= 1) {
 throw new IllegalStateException(

代码示例来源:origin: linkedin/kafka-monitor

/**
 * @param zkUrl zookeeper connection url
 * @return      number of brokers in this cluster
 */
public static int getBrokerCount(String zkUrl) {
 ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
 try {
  return zkUtils.getAllBrokersInCluster().size();
 } finally {
  zkUtils.close();
 }
}

代码示例来源:origin: linkedin/kafka-monitor

return getPartitionNumForTopic(zkUrl, topic);
int brokerCount = zkUtils.getAllBrokersInCluster().size();
int partitionCount = Math.max((int) Math.ceil(brokerCount * partitionToBrokerRatio), minPartitionNum);

代码示例来源:origin: stackoverflow.com

List<Broker> listBrokers() {

    final ZkConnection zkConnection = new ZkConnection(connectionString);
    final int sessionTimeoutMs = 10 * 1000;
    final int connectionTimeoutMs = 20 * 1000;
    final ZkClient zkClient = new ZkClient(connectionString,
                        sessionTimeoutMs,
                        connectionTimeoutMs,
                        ZKStringSerializer$.MODULE$);

    final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);

    scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
}

代码示例来源:origin: shunfei/DCMonitor

public List<BrokerInfo> getCluster() {
 return Lists.transform(
  JavaConversions.asJavaList(ZkUtils.getAllBrokersInCluster(zkClient)), new Function<Broker, BrokerInfo>() {
   @Override
   public BrokerInfo apply(Broker input) {
    BrokerInfo info = new BrokerInfo();
    info.host = input.host();
    info.port = input.port();
    info.id = input.id();
    return info;
   }
  }
 );
}

代码示例来源:origin: pinterest/doctorkafka

/**
 *   return the list of brokers that do not have stats
 */
public List<Broker> getNoStatsBrokers() {
 Seq<Broker> brokerSeq = zkUtils.getAllBrokersInCluster();
 List<Broker> brokers = scala.collection.JavaConverters.seqAsJavaList(brokerSeq);
 List<Broker> noStatsBrokers = new ArrayList<>();
 brokers.stream().forEach(broker -> {
  if (kafkaCluster.getBroker(broker.id()) == null) {
   noStatsBrokers.add(broker);
  }
 });
 return noStatsBrokers;
}

代码示例来源:origin: SiftScience/kafka-assigner

private static Set<Integer> brokerHostnamesToBrokerIds(
    ZkUtils zkUtils, Set<String> brokerHostnameSet, boolean checkPresence) {
  List<Broker> brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
  Set<Integer> brokerIdSet = Sets.newHashSet();
  for (Broker broker : brokers) {
    BrokerEndPoint endpoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
    if (brokerHostnameSet.contains(endpoint.host())) {
      brokerIdSet.add(broker.id());
    }
  }
  Preconditions.checkArgument(!checkPresence ||
      brokerHostnameSet.size() == brokerIdSet.size(),
      "Some hostnames could not be found! We found: " + brokerIdSet);
  return brokerIdSet;
}

代码示例来源:origin: apache/apex-malhar

/**
 * There is always only one string in zkHost
 * @param zkHost
 * @return
 */
public static Set<String> getBrokers(Set<String> zkHost)
{
 ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
 Set<String> brokerHosts = new HashSet<String>();
 for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
  brokerHosts.add(b.connectionString());
 }
 zkclient.close();
 return brokerHosts;
}

代码示例来源:origin: org.apache.apex/malhar-contrib

/**
 * There is always only one string in zkHost
 * @param zkHost
 * @return
 */
public static Set<String> getBrokers(Set<String> zkHost){
 ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
 Set<String> brokerHosts = new HashSet<String>();
 for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
  brokerHosts.add(b.connectionString());
 }
 zkclient.close();
 return brokerHosts;
}

代码示例来源:origin: SiftScience/kafka-assigner

private Map<Integer, String> getRackAssignment(ZkUtils zkUtils) {
  List<Broker> brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
  Map<Integer, String> rackAssignment = Maps.newHashMap();
  if (!disableRackAwareness) {
    for (Broker broker : brokers) {
      scala.Option<String> rack = broker.rack();
      if (rack.isDefined()) {
        rackAssignment.put(broker.id(), rack.get());
      }
    }
  }
  return rackAssignment;
}

代码示例来源:origin: gnuhpc/Kafka-zk-restapi

public List<BrokerInfo> listBrokers() {
 List<Broker> brokerList =
   CollectionConvertor.seqConvertJavaList(zkUtils.getAllBrokersInCluster());
 return brokerList
   .parallelStream()
   .collect(Collectors.toMap(Broker::id, Broker::rack))
   .entrySet()
   .parallelStream()
   .map(
     entry -> {
      String brokerInfoStr = null;
      try {
       brokerInfoStr =
         new String(
           zkClient.getData().forPath(ZkUtils.BrokerIdsPath() + "/" + entry.getKey()));
      } catch (Exception e) {
       e.printStackTrace();
      }
      BrokerInfo brokerInfo = gson.fromJson(brokerInfoStr, BrokerInfo.class);
      if (entry.getValue().isEmpty()) brokerInfo.setRack("");
      else {
       brokerInfo.setRack(entry.getValue().get());
      }
      brokerInfo.setId(entry.getKey());
      return brokerInfo;
     })
   .collect(toList());
}

代码示例来源:origin: SiftScience/kafka-assigner

private static void printCurrentBrokers(ZkUtils zkUtils) throws JSONException {
  List<Broker> brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
  JSONArray json = new JSONArray();
  for (Broker broker : brokers) {
    BrokerEndPoint endpoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
    JSONObject brokerJson = new JSONObject();
    brokerJson.put("id", broker.id());
    brokerJson.put("host", endpoint.host());
    brokerJson.put("port", endpoint.port());
    if (broker.rack().isDefined()) {
      brokerJson.put("rack", broker.rack().get());
    }
    json.put(brokerJson);
  }
  System.out.println("CURRENT BROKERS:");
  System.out.println(json.toString());
}

代码示例来源:origin: com.github.pinterest/kafkastats

public static String getBrokers(String zkUrl, SecurityProtocol securityProtocol) {
 ZkUtils zkUtils = getZkUtils(zkUrl);
 Seq<Broker> brokersSeq = zkUtils.getAllBrokersInCluster();
 Broker[] brokers = new Broker[brokersSeq.size()];
 brokersSeq.copyToArray(brokers);
 String brokersStr = Arrays.stream(brokers)
   .map(b -> b.brokerEndPoint(
     ListenerName.forSecurityProtocol(securityProtocol)).connectionString())
   .reduce(null, (a, b) -> (a == null) ? b : a + "," + b);
 return brokersStr;
}

代码示例来源:origin: pinterest/doctorkafka

public static String getBrokers(String zkUrl, SecurityProtocol securityProtocol) {
 ZkUtils zkUtils = getZkUtils(zkUrl);
 Seq<Broker> brokersSeq = zkUtils.getAllBrokersInCluster();
 Broker[] brokers = new Broker[brokersSeq.size()];
 brokersSeq.copyToArray(brokers);
 String brokersStr = Arrays.stream(brokers)
   .map(b -> b.brokerEndPoint(
     ListenerName.forSecurityProtocol(securityProtocol)).connectionString())
   .reduce(null, (a, b) -> (a == null) ? b : a + "," + b);
 return brokersStr;
}

代码示例来源:origin: SiftScience/kafka-assigner

if (brokerSet == null || brokerSet.isEmpty()) {
  brokerSet = Sets.newHashSet(Lists.transform(
      JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()),
      new Function<Broker, Integer>() {
        @Override

代码示例来源:origin: vakinge/jeesuite-libs

public List<BrokerInfo> fetchAllBrokers(){
  List<BrokerInfo> result = new ArrayList<>();
  Seq<Broker> brokers = zkUtils.getAllBrokersInCluster();
  Iterator<Broker> iterator = brokers.toList().iterator();
  while(iterator.hasNext()){
    Broker broker = iterator.next();
    Node node = broker.getNode(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)).get();
    result.add(new BrokerInfo(node.idString(), node.host(), node.port()));
  }
  return result;
}

代码示例来源:origin: com.hurence.logisland/logisland-agent

kafkaClusterZkUrl, zkSessionTimeoutMs, zkSessionTimeoutMs,
    JaasUtils.isZkSecurityEnabled());
this.brokerSeq = zkUtils.getAllBrokersInCluster();

相关文章