本文整理了Java中kafka.utils.ZkUtils.getAllBrokersInCluster()
方法的一些代码示例,展示了ZkUtils.getAllBrokersInCluster()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getAllBrokersInCluster()
方法的具体详情如下:
包路径:kafka.utils.ZkUtils
类名称:ZkUtils
方法名:getAllBrokersInCluster
暂无
代码示例来源:origin: linkedin/cruise-control
private Set<Integer> aliveBrokers() {
// We get the alive brokers from ZK directly.
return JavaConversions.asJavaCollection(_zkUtils.getAllBrokersInCluster())
.stream().map(Broker::id).collect(toSet());
}
代码示例来源:origin: linkedin/cruise-control
brokerSampleRetentionMs = Math.max(_minBrokerSampleStoreTopicRetentionTimeMs, brokerSampleRetentionMs);
int numberOfBrokersInCluster = zkUtils.getAllBrokersInCluster().size();
if (numberOfBrokersInCluster <= 1) {
throw new IllegalStateException(
代码示例来源:origin: linkedin/kafka-monitor
/**
* @param zkUrl zookeeper connection url
* @return number of brokers in this cluster
*/
public static int getBrokerCount(String zkUrl) {
ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
try {
return zkUtils.getAllBrokersInCluster().size();
} finally {
zkUtils.close();
}
}
代码示例来源:origin: linkedin/kafka-monitor
return getPartitionNumForTopic(zkUrl, topic);
int brokerCount = zkUtils.getAllBrokersInCluster().size();
int partitionCount = Math.max((int) Math.ceil(brokerCount * partitionToBrokerRatio), minPartitionNum);
代码示例来源:origin: stackoverflow.com
List<Broker> listBrokers() {
final ZkConnection zkConnection = new ZkConnection(connectionString);
final int sessionTimeoutMs = 10 * 1000;
final int connectionTimeoutMs = 20 * 1000;
final ZkClient zkClient = new ZkClient(connectionString,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
}
代码示例来源:origin: shunfei/DCMonitor
public List<BrokerInfo> getCluster() {
return Lists.transform(
JavaConversions.asJavaList(ZkUtils.getAllBrokersInCluster(zkClient)), new Function<Broker, BrokerInfo>() {
@Override
public BrokerInfo apply(Broker input) {
BrokerInfo info = new BrokerInfo();
info.host = input.host();
info.port = input.port();
info.id = input.id();
return info;
}
}
);
}
代码示例来源:origin: pinterest/doctorkafka
/**
* return the list of brokers that do not have stats
*/
public List<Broker> getNoStatsBrokers() {
Seq<Broker> brokerSeq = zkUtils.getAllBrokersInCluster();
List<Broker> brokers = scala.collection.JavaConverters.seqAsJavaList(brokerSeq);
List<Broker> noStatsBrokers = new ArrayList<>();
brokers.stream().forEach(broker -> {
if (kafkaCluster.getBroker(broker.id()) == null) {
noStatsBrokers.add(broker);
}
});
return noStatsBrokers;
}
代码示例来源:origin: SiftScience/kafka-assigner
private static Set<Integer> brokerHostnamesToBrokerIds(
ZkUtils zkUtils, Set<String> brokerHostnameSet, boolean checkPresence) {
List<Broker> brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
Set<Integer> brokerIdSet = Sets.newHashSet();
for (Broker broker : brokers) {
BrokerEndPoint endpoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
if (brokerHostnameSet.contains(endpoint.host())) {
brokerIdSet.add(broker.id());
}
}
Preconditions.checkArgument(!checkPresence ||
brokerHostnameSet.size() == brokerIdSet.size(),
"Some hostnames could not be found! We found: " + brokerIdSet);
return brokerIdSet;
}
代码示例来源:origin: apache/apex-malhar
/**
* There is always only one string in zkHost
* @param zkHost
* @return
*/
public static Set<String> getBrokers(Set<String> zkHost)
{
ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
Set<String> brokerHosts = new HashSet<String>();
for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
brokerHosts.add(b.connectionString());
}
zkclient.close();
return brokerHosts;
}
代码示例来源:origin: org.apache.apex/malhar-contrib
/**
* There is always only one string in zkHost
* @param zkHost
* @return
*/
public static Set<String> getBrokers(Set<String> zkHost){
ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
Set<String> brokerHosts = new HashSet<String>();
for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
brokerHosts.add(b.connectionString());
}
zkclient.close();
return brokerHosts;
}
代码示例来源:origin: SiftScience/kafka-assigner
private Map<Integer, String> getRackAssignment(ZkUtils zkUtils) {
List<Broker> brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
Map<Integer, String> rackAssignment = Maps.newHashMap();
if (!disableRackAwareness) {
for (Broker broker : brokers) {
scala.Option<String> rack = broker.rack();
if (rack.isDefined()) {
rackAssignment.put(broker.id(), rack.get());
}
}
}
return rackAssignment;
}
代码示例来源:origin: gnuhpc/Kafka-zk-restapi
public List<BrokerInfo> listBrokers() {
List<Broker> brokerList =
CollectionConvertor.seqConvertJavaList(zkUtils.getAllBrokersInCluster());
return brokerList
.parallelStream()
.collect(Collectors.toMap(Broker::id, Broker::rack))
.entrySet()
.parallelStream()
.map(
entry -> {
String brokerInfoStr = null;
try {
brokerInfoStr =
new String(
zkClient.getData().forPath(ZkUtils.BrokerIdsPath() + "/" + entry.getKey()));
} catch (Exception e) {
e.printStackTrace();
}
BrokerInfo brokerInfo = gson.fromJson(brokerInfoStr, BrokerInfo.class);
if (entry.getValue().isEmpty()) brokerInfo.setRack("");
else {
brokerInfo.setRack(entry.getValue().get());
}
brokerInfo.setId(entry.getKey());
return brokerInfo;
})
.collect(toList());
}
代码示例来源:origin: SiftScience/kafka-assigner
private static void printCurrentBrokers(ZkUtils zkUtils) throws JSONException {
List<Broker> brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
JSONArray json = new JSONArray();
for (Broker broker : brokers) {
BrokerEndPoint endpoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
JSONObject brokerJson = new JSONObject();
brokerJson.put("id", broker.id());
brokerJson.put("host", endpoint.host());
brokerJson.put("port", endpoint.port());
if (broker.rack().isDefined()) {
brokerJson.put("rack", broker.rack().get());
}
json.put(brokerJson);
}
System.out.println("CURRENT BROKERS:");
System.out.println(json.toString());
}
代码示例来源:origin: com.github.pinterest/kafkastats
public static String getBrokers(String zkUrl, SecurityProtocol securityProtocol) {
ZkUtils zkUtils = getZkUtils(zkUrl);
Seq<Broker> brokersSeq = zkUtils.getAllBrokersInCluster();
Broker[] brokers = new Broker[brokersSeq.size()];
brokersSeq.copyToArray(brokers);
String brokersStr = Arrays.stream(brokers)
.map(b -> b.brokerEndPoint(
ListenerName.forSecurityProtocol(securityProtocol)).connectionString())
.reduce(null, (a, b) -> (a == null) ? b : a + "," + b);
return brokersStr;
}
代码示例来源:origin: pinterest/doctorkafka
public static String getBrokers(String zkUrl, SecurityProtocol securityProtocol) {
ZkUtils zkUtils = getZkUtils(zkUrl);
Seq<Broker> brokersSeq = zkUtils.getAllBrokersInCluster();
Broker[] brokers = new Broker[brokersSeq.size()];
brokersSeq.copyToArray(brokers);
String brokersStr = Arrays.stream(brokers)
.map(b -> b.brokerEndPoint(
ListenerName.forSecurityProtocol(securityProtocol)).connectionString())
.reduce(null, (a, b) -> (a == null) ? b : a + "," + b);
return brokersStr;
}
代码示例来源:origin: SiftScience/kafka-assigner
if (brokerSet == null || brokerSet.isEmpty()) {
brokerSet = Sets.newHashSet(Lists.transform(
JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()),
new Function<Broker, Integer>() {
@Override
代码示例来源:origin: vakinge/jeesuite-libs
public List<BrokerInfo> fetchAllBrokers(){
List<BrokerInfo> result = new ArrayList<>();
Seq<Broker> brokers = zkUtils.getAllBrokersInCluster();
Iterator<Broker> iterator = brokers.toList().iterator();
while(iterator.hasNext()){
Broker broker = iterator.next();
Node node = broker.getNode(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)).get();
result.add(new BrokerInfo(node.idString(), node.host(), node.port()));
}
return result;
}
代码示例来源:origin: com.hurence.logisland/logisland-agent
kafkaClusterZkUrl, zkSessionTimeoutMs, zkSessionTimeoutMs,
JaasUtils.isZkSecurityEnabled());
this.brokerSeq = zkUtils.getAllBrokersInCluster();
内容来源于网络,如有侵权,请联系作者删除!