本文整理了Java中kafka.utils.ZkUtils.ConsumersPath()
方法的一些代码示例,展示了ZkUtils.ConsumersPath()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.ConsumersPath()
方法的具体详情如下:
包路径:kafka.utils.ZkUtils
类名称:ZkUtils
方法名:ConsumersPath
暂无
代码示例来源:origin: uber/chaperone
private static void removeOffsetInfoFromZk(final String groupId) {
ZkUtils zkUtils =
ZkUtils.apply(AuditConfig.INGESTER_ZK_CONNECT, Integer.valueOf(AuditConfig.INGESTER_ZK_SESSION_TIMEOUT_MS),
Integer.valueOf(AuditConfig.INGESTER_ZK_SESSION_TIMEOUT_MS), false);
try {
String[] targets = new String[] {"offsets", "owners"};
for (String target : targets) {
String zkPath = String.format("%s/%s/%s", ZkUtils.ConsumersPath(), groupId, target);
logger.info("Remove {} with znode path={}", target, zkPath);
zkUtils.deletePathRecursive(zkPath);
}
} catch (Exception e) {
logger.error("Got exception to remove offsets or owners from zookeeper, with zkPathPrefix={}",
String.format("%s/%s/", ZkUtils.ConsumersPath(), groupId));
throw e;
} finally {
zkUtils.close();
}
}
代码示例来源:origin: uber/chaperone
private static void putOffsetInfoIntoZk(String groupId, Map<String, Map<Integer, Long>> topicOffsetsMap) {
ZkUtils zkUtils =
ZkUtils.apply(AuditConfig.INGESTER_ZK_CONNECT, Integer.valueOf(AuditConfig.INGESTER_ZK_SESSION_TIMEOUT_MS),
Integer.valueOf(AuditConfig.INGESTER_ZK_SESSION_TIMEOUT_MS), false);
try {
for (Map.Entry<String, Map<Integer, Long>> topicEntry : topicOffsetsMap.entrySet()) {
String zkPath = String.format("%s/%s/offsets/%s/", ZkUtils.ConsumersPath(), groupId, topicEntry.getKey());
for (Map.Entry<Integer, Long> offsetEntry : topicEntry.getValue().entrySet()) {
logger.info("Put offset={} to partition={} with znode path={}", offsetEntry.getValue(), offsetEntry.getKey(),
zkPath + offsetEntry.getKey());
zkUtils.updatePersistentPath(zkPath + offsetEntry.getKey(), offsetEntry.getValue().toString(),
zkUtils.DefaultAcls());
}
}
} catch (Exception e) {
logger.error("Got exception to put offset, with zkPathPrefix={}",
String.format("%s/%s/offsets", ZkUtils.ConsumersPath(), groupId));
throw e;
} finally {
zkUtils.close();
}
}
代码示例来源:origin: HomeAdvisor/Kafdrop
private Stream<ConsumerVO> getConsumerStream(TopicVO topic)
{
return consumerTreeCache.getCurrentChildren(ZkUtils.ConsumersPath()).keySet().stream()
.map(g -> getConsumerByTopic(g, topic))
.filter(Optional::isPresent)
.map(Optional::get)
.sorted(Comparator.comparing(ConsumerVO::getGroupId));
}
代码示例来源:origin: gnuhpc/Kafka-zk-restapi
private Set<String> listOldConsumerGroupsByTopic(@TopicExistConstraint String topic)
throws Exception {
List<String> consumersFromZk = zkClient.getChildren().forPath(ZkUtils.ConsumersPath());
Set<String> cList = new HashSet<>();
for (String consumer : consumersFromZk) {
String path = ZkUtils.ConsumersPath() + "/" + consumer + "/offsets";
if (zkClient.checkExists().forPath(path) != null) {
if (zkClient.getChildren().forPath(path).size() != 0) {
if (!Strings.isNullOrEmpty(topic)) {
if (zkClient.getChildren().forPath(path).stream().filter(p -> p.equals(topic)).count()
!= 0) cList.add(consumer);
} else {
cList.add(consumer);
}
}
}
}
return cList;
// May cause keeperexception, deprecated
// return
// JavaConverters.asJavaCollectionConverter(zkUtils.getAllConsumerGroupsForTopic(topic)).asJavaCollection().stream().collect(toList());
}
代码示例来源:origin: shunfei/DCMonitor
public List<String> getGroups() {
try {
List<String> list = zkClient.getChildren(ZkUtils.ConsumersPath());
if (list == null) {
return Collections.emptyList();
} else {
return list;
}
} catch (Exception e) {
log.error(e, "could not get groups");
return Collections.emptyList();
}
}
代码示例来源:origin: shunfei/DCMonitor
String offsetPath = String.format(
"%s/%s/offsets/%s/%d",
ZkUtils.ConsumersPath(),
group,
topic,
ZkUtils.ConsumersPath(),
group,
topic,
代码示例来源:origin: shunfei/DCMonitor
consumers = JavaConversions.asJavaList(ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath()));
} catch (Exception e) {
log.error(e, "could not get all consumer list");
代码示例来源:origin: HomeAdvisor/Kafdrop
topicTreeCache.start();
consumerTreeCache = new TreeCache(curatorFramework, ZkUtils.ConsumersPath());
consumerTreeCache.getListenable().addListener((client, event) -> {
if (event.getType() == TreeCacheEvent.Type.INITIALIZED)
内容来源于网络,如有侵权,请联系作者删除!