org.apache.pulsar.zookeeper.ZooKeeperDataCache类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(169)

本文整理了Java中org.apache.pulsar.zookeeper.ZooKeeperDataCache类的一些代码示例,展示了ZooKeeperDataCache类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperDataCache类的具体详情如下:
包路径:org.apache.pulsar.zookeeper.ZooKeeperDataCache
类名称:ZooKeeperDataCache

ZooKeeperDataCache介绍

[英]Provides a generic cache for reading objects stored in zookeeper. Maintains the objects already serialized in memory and sets watches to receive changes notifications.
[中]为读取zookeeper中存储的对象提供通用缓存。维护已在内存中序列化的对象,并设置手表以接收更改通知。

代码示例

代码示例来源:origin: org.apache.pulsar/pulsar-discovery-service

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
  List<LoadManagerReport> availableBrokers = new ArrayList<>(brokerNodes.size());
  for (String broker : brokerNodes) {
    availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
  }
  this.availableBrokers = availableBrokers;
}

代码示例来源:origin: org.apache.pulsar/pulsar-zookeeper-utils

/**
 * Return an item from the cache
 *
 * If node doens't exist, the value will be not present.s
 *
 * @param path
 * @return
 * @throws Exception
 */
public Optional<T> get(final String path) throws Exception {
  return getAsync(path).get();
}

代码示例来源:origin: org.apache.pulsar/pulsar-broker

/**
 * Set resource quota for a specified <code>ServiceUnit</code>.
 *
 * @param suName
 *            identifier of the <code>ServiceUnit</code>
 * @param quota
 *            <code>ResourceQuota</code> to set.
 */
public void setQuota(String suName, ResourceQuota quota) throws Exception {
  String zpath = ResourceQuotaCache.path(suName);
  this.resourceQuotaCache.invalidate(zpath);
  this.saveQuotaToZnode(zpath, quota);
}

代码示例来源:origin: org.apache.pulsar/pulsar-broker

@Override
public void stop() throws PulsarServerException {
  loadReportCacheZk.close();
  loadReportCacheZk.clear();
  availableActiveBrokers.close();
  scheduler.shutdown();
}

代码示例来源:origin: org.apache.pulsar/pulsar-broker

.get(nsIsolationPolicyPath).orElseGet(() -> {
        try {
          this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
      -1);
  namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
} catch (KeeperException.NoNodeException nne) {
  log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),

代码示例来源:origin: org.apache.pulsar/pulsar-broker

configCache = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
} catch (Exception e) {
  log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e);
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
  @SuppressWarnings("unchecked")
  @Override

代码示例来源:origin: org.apache.pulsar/pulsar-broker

policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
      () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist."));
      ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
          .orElseThrow(() -> new RestException(Status.NOT_FOUND,
              "Cluster " + replCluster + " does not exist"));
  globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policies),
      policiesNode.getValue().getVersion());
  policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
} catch (Exception e) {
  log.error("[{}] Failed to delete namespace on global ZK {}", clientAppId(), namespaceName, e);
  globalZk().delete(globalZkPolicyPath, -1);
  localZk().delete(lcaolZkPolicyPath, -1);
  policiesCache().invalidate(globalZkPolicyPath);
  localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
} catch (PulsarAdminException cae) {
  throw new RestException(cae);

代码示例来源:origin: org.apache.pulsar/pulsar-broker

final String path = path(POLICIES, namespaceName.toString());
policiesNode = policiesCache().getWithStat(path).orElseThrow(
  () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
policiesCache().invalidate(path);

代码示例来源:origin: org.apache.pulsar/pulsar-broker

pulsar.getLocalZkCacheService().policiesCache().getWithStatAsync(path).thenAccept(result -> {
  .registerListener((String path, LocalPolicies data, Stat stat) -> {
    String[] paths = path.split(LOCAL_POLICIES_ROOT + "/");
    if (paths.length == 2) {
pulsar.getLocalZkCacheService().policiesCache().registerListener(this);

代码示例来源:origin: org.apache.pulsar/pulsar-zookeeper-utils

private ZooKeeperDataCache<BookiesRackConfiguration> getAndSetZkCache(Configuration conf) {
  ZooKeeperCache zkCache = null;
  if (conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) != null) {
    zkCache = (ZooKeeperCache) conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE);
  } else {
    int zkTimeout;
    String zkServers;
    if (conf instanceof ClientConfiguration) {
      zkTimeout = ((ClientConfiguration) conf).getZkTimeout();
      zkServers = ((ClientConfiguration) conf).getZkServers();
      try {
        ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
            .sessionTimeoutMs(zkTimeout).build();
        zkCache = new ZooKeeperCache(zkClient) {
        };
        conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
      } catch (Exception e) {
        LOG.error("Error creating zookeeper client", e);
      }
    } else {
      LOG.error("No zk configurations available");
    }
  }
  ZooKeeperDataCache<BookiesRackConfiguration> zkDataCache = getZkBookieRackMappingCache(
      zkCache);
  if (zkDataCache != null) {
    zkDataCache.registerListener(this);
  }
  return zkDataCache;
}

代码示例来源:origin: org.apache.pulsar/pulsar-broker

private void deleteFailureDomain(String clusterPath) {
  try {
    String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN);
    if (globalZk().exists(failureDomain, false) == null) {
      return;
    }
    for (String domain : globalZk().getChildren(failureDomain, false)) {
      String domainPath = joinPath(failureDomain, domain);
      globalZk().delete(domainPath, -1);
    }
    globalZk().delete(failureDomain, -1);
    failureDomainCache().clear();
    failureDomainListCache().clear();
  } catch (Exception e) {
    log.warn("Failed to delete failure-domain under cluster {}", clusterPath);
    throw new RestException(e);
  }
}

代码示例来源:origin: org.apache.pulsar/pulsar-broker

.get(nsIsolationPolicyPath).orElseGet(() -> {
        try {
          this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
      -1);
  namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
} catch (IllegalArgumentException iae) {
  log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",

代码示例来源:origin: org.apache.pulsar/pulsar-broker

final String path = path(POLICIES, namespaceName.toString());
policiesNode = policiesCache().getWithStat(path).orElseThrow(
    () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
policiesCache().invalidate(path);

代码示例来源:origin: org.apache.pulsar/pulsar-broker

pulsar.getConfigurationCache().policiesCache().registerListener(this);

代码示例来源:origin: org.apache.pulsar/pulsar-broker

/**
 * As any broker, stop the load manager.
 *
 * @throws PulsarServerException
 *             If an unexpected error occurred when attempting to stop the load manager.
 */
@Override
public void stop() throws PulsarServerException {
  if (availableActiveBrokers != null) {
    availableActiveBrokers.close();
  }
  if (brokerDataCache != null) {
    brokerDataCache.close();
    brokerDataCache.clear();
  }
  scheduler.shutdown();
}

代码示例来源:origin: org.apache.pulsar/pulsar-proxy

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
  List<LoadManagerReport> availableBrokers = new ArrayList<>(brokerNodes.size());
  for (String broker : brokerNodes) {
    availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
  }
  this.availableBrokers = availableBrokers;
}

代码示例来源:origin: org.apache.pulsar/pulsar-broker

/**
 * Update new bundle-range to LocalZk (create a new node if not present).
 * Update may fail because of concurrent write to Zookeeper.
 *
 * @param nsname
 * @param nsBundles
 * @param callback
 * @throws Exception
 */
private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, StatCallback callback)
    throws Exception {
  checkNotNull(nsname);
  checkNotNull(nsBundles);
  String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
  Optional<LocalPolicies> policies = pulsar.getLocalZkCacheService().policiesCache().get(path);
  if (!policies.isPresent()) {
    // if policies is not present into localZk then create new policies
    this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
  }
  long version = nsBundles.getVersion();
  LocalPolicies local = new LocalPolicies();
  local.bundles = getBundlesData(nsBundles);
  byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(local);
  this.pulsar.getLocalZkCache().getZooKeeper()
    .setData(path, data, Math.toIntExact(version), callback, null);
  // invalidate namespace's local-policies
  this.pulsar.getLocalZkCacheService().policiesCache().invalidate(path);
}

代码示例来源:origin: org.apache.pulsar/pulsar-broker

ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
policiesCache.getAsync(path(POLICIES, namespaceName)).thenAccept(policies -> {
  if (!policies.isPresent() || StringUtils.isBlank(policies.get().antiAffinityGroup)) {
    antiAffinityNsBrokersResult.complete(null);
      CompletableFuture<Void> future = new CompletableFuture<>();
      futures.add(future);
      policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
        if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().antiAffinityGroup)) {
          brokerToAntiAffinityNamespaceCount.compute(broker,

代码示例来源:origin: org.apache.pulsar/pulsar-broker-common

configCache.policiesCache().invalidate(policiesPath);

代码示例来源:origin: org.apache.pulsar/pulsar-broker

protected void internalModifyDeduplication(boolean enableDeduplication) {
  validateAdminAccessForTenant(namespaceName.getTenant());
  validatePoliciesReadOnlyAccess();
  Entry<Policies, Stat> policiesNode = null;
  try {
    // Force to read the data s.t. the watch to the cache content is setup.
    policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
        () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
    policiesNode.getKey().deduplicationEnabled = enableDeduplication;
    // Write back the new policies into zookeeper
    globalZk().setData(path(POLICIES, namespaceName.toString()),
        jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
    policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
    log.info("[{}] Successfully {} on namespace {}", clientAppId(),
        enableDeduplication ? "enabled" : "disabled", namespaceName);
  } catch (KeeperException.NoNodeException e) {
    log.warn("[{}] Failed to modify deplication status for namespace {}: does not exist", clientAppId(),
        namespaceName);
    throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
  } catch (KeeperException.BadVersionException e) {
    log.warn(
        "[{}] Failed to modify deplication status on namespace {} expected policy node version={} : concurrent modification",
        clientAppId(), namespaceName, policiesNode.getValue().getVersion());
    throw new RestException(Status.CONFLICT, "Concurrent modification");
  } catch (Exception e) {
    log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e);
    throw new RestException(e);
  }
}

相关文章