com.yahoo.pulsar.zookeeper.ZooKeeperDataCache类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(90)

本文整理了Java中com.yahoo.pulsar.zookeeper.ZooKeeperDataCache类的一些代码示例,展示了ZooKeeperDataCache类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperDataCache类的具体详情如下:
包路径:com.yahoo.pulsar.zookeeper.ZooKeeperDataCache
类名称:ZooKeeperDataCache

ZooKeeperDataCache介绍

暂无

代码示例

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private String getDynamicConfigurationFromZK(String zkPath, String settingName, String defaultValue) {
  try {
    return dynamicConfigurationCache.get(zkPath).map(c -> c.get(settingName)).orElse(defaultValue);
  } catch (Exception e) {
    log.warn("Got exception when reading ZooKeeper path [{}]:", zkPath, e);
    return defaultValue;
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@Override
  public CompletableFuture<Optional<LocalPolicies>> getAsync(String path) {
    CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<>();
    // First check in local-zk cache
    super.getAsync(path).thenAccept(localPolicies -> {
      if (localPolicies.isPresent()) {
        future.complete(localPolicies);
      } else {
        // create new policies node under Local ZK by coping it from Global ZK
        createPolicies(path, true).thenAccept(p -> {
          LOG.info("Successfully created local policies for {} -- {}", path, p);
          future.complete(p);
        }).exceptionally(ex -> {
          future.completeExceptionally(ex);
          return null;
        });
      }
    }).exceptionally(ex -> {
      future.completeExceptionally(ex);
      return null;
    });
    return future;
  }
};

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@Override
  public void stop() throws PulsarServerException {
    loadReportCacheZk.close();
    loadReportCacheZk.clear();
    availableActiveBrokers.close();
    scheduler.shutdown();
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

.get(nsIsolationPolicyPath).orElseGet(() -> {
        try {
          this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
      -1);
  namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
} catch (KeeperException.NoNodeException nne) {
  log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

policiesNode = policiesCache().getWithStat(path("policies", property, cluster, namespace))
      .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist."));
      ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
          .orElseThrow(() -> new RestException(Status.NOT_FOUND,
              "Cluser " + replCluster + " does not exist"));
  globalZk().setData(path("policies", property, cluster, namespace), jsonMapper().writeValueAsBytes(policies),
      policiesNode.getValue().getVersion());
  policiesCache().invalidate(path("policies", property, cluster, namespace));
} catch (Exception e) {
  log.error("[{}] Failed to delete namespace on global ZK {}/{}/{}", clientAppId(), property, cluster,
  globalZk().delete(globalZkPolicyPath, -1);
  localZk().delete(lcaolZkPolicyPath, -1);
  policiesCache().invalidate(globalZkPolicyPath);
  localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
} catch (PulsarAdminException cae) {
  throw new RestException(cae);

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

Optional<Map<String, String>> data = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
  if (data.isPresent() && data.get() != null) {
    data.get().forEach((key, value) -> {
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
  @SuppressWarnings("unchecked")
  @Override

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
  this.hashFunc = hashFunc;
  this.bundlesCache = Caffeine.newBuilder().buildAsync((NamespaceName namespace, Executor executor) -> {
    String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString());
    if (LOG.isDebugEnabled()) {
      LOG.debug("Loading cache with bundles for {}", namespace);
    }
    if (pulsar == null || pulsar.getConfigurationCache() == null) {
      return CompletableFuture.completedFuture(getBundles(namespace, null));
    }
    CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
    // Read the static bundle data from the policies
    pulsar.getLocalZkCacheService().policiesCache().getAsync(path).thenAccept(policies -> {
      // If no policies defined for namespace, assume 1 single bundle
      BundlesData bundlesData = policies.map(p -> p.bundles).orElse(null);
      NamespaceBundles namespaceBundles = getBundles(namespace, bundlesData);
      future.complete(namespaceBundles);
    }).exceptionally(ex -> {
      future.completeExceptionally(ex);
      return null;
    });
    return future;
  });
  if (pulsar != null && pulsar.getConfigurationCache() != null) {
    pulsar.getLocalZkCacheService().policiesCache().registerListener(this);
  }
  this.pulsar = pulsar;
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

/**
 * Set resource quota for a specified <code>ServiceUnit</code>.
 *
 * @param suName
 *            identifier of the <code>ServiceUnit</code>
 * @param quota
 *            <code>ResourceQuota</code> to set.
 */
public void setQuota(String suName, ResourceQuota quota) throws Exception {
  String zpath = ResourceQuotaCache.path(suName);
  this.resourceQuotaCache.invalidate(zpath);
  this.saveQuotaToZnode(zpath, quota);
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

policiesNode = policiesCache().getWithStat(path("policies", property, cluster, namespace))
    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().message_ttl_in_seconds = messageTTL;
policiesCache().invalidate(path("policies", property, cluster, namespace));

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

/**
 * Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate
 * action if any specific config-field value has been changed.
 * </p>
 * On notification, listener should first check if config value has been changed and after taking appropriate
 * action, listener should update config value with new value if it has been changed (so, next time listener can
 * compare values on configMap change).
 * @param <T>
 * 
 * @param configKey
 *            : configuration field name
 * @param listener
 *            : listener which takes appropriate action on config-value change
 */
public <T> void registerConfigurationListener(String configKey, Consumer<T> listener) {
  configRegisteredListeners.put(configKey, listener);
  dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
    @SuppressWarnings("unchecked")
    @Override
    public void onUpdate(String path, Map<String, String> data, Stat stat) {
      if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null
          && data.containsKey(configKey)) {
        log.info("Updating configuration {}/{}", configKey, data.get(configKey));
        listener.accept((T) FieldParser.value(data.get(configKey), dynamicConfigurationMap.get(configKey)));
      }
    }
  });
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@Override
public void close() throws IOException {
  log.info("Shutting down Pulsar Broker service");
  if (pulsar.getConfigurationCache() != null) {
    pulsar.getConfigurationCache().policiesCache().unregisterListener(this);
  }
  // unloads all namespaces gracefully without disrupting mutually
  unloadNamespaceBundlesGracefully();
  // close replication clients
  replicationClients.forEach((cluster, client) -> {
    try {
      client.shutdown();
    } catch (PulsarClientException e) {
      log.warn("Error shutting down repl client for cluster {}", cluster, e);
    }
  });
  acceptorGroup.shutdownGracefully();
  workerGroup.shutdownGracefully();
  statsUpdater.shutdown();
  inactivityMonitor.shutdown();
  messageExpiryMonitor.shutdown();
  backlogQuotaChecker.shutdown();
  authenticationService.close();
  pulsarStats.close();
  log.info("Broker service completely shut down");
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
    .getDynamicConfigurationCache();
Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
    .orElse(null);
if (configurationMap != null) {
  configurationMap.put(configName, configValue);
  byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
  dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
  serviceConfigZkVersion = localZk()
      .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

/**
 * Disable bundle in local cache and on zk
 * 
 * @param bundle
 * @throws Exception
 */
public void disableOwnership(NamespaceBundle bundle) throws Exception {
  String path = ServiceUnitZkUtils.path(bundle);
  updateBundleState(bundle, false);
  localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
  ownershipReadOnlyCache.invalidate(path);
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

policiesNode = policiesCache().getWithStat(path("policies", property, cluster, namespace))
    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().replication_clusters = clusterIds;
policiesCache().invalidate(path("policies", property, cluster, namespace));

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

pulsar.getConfigurationCache().policiesCache().registerListener(this);

代码示例来源:origin: com.yahoo.pulsar/pulsar-discovery-service

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
  List<LoadReport> availableBrokers = new ArrayList<>(brokerNodes.size());
  for (String broker : brokerNodes) {
    availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
  }
  this.availableBrokers = availableBrokers;
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

.get(nsIsolationPolicyPath).orElseGet(() -> {
        try {
          this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
      -1);
  namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
} catch (IllegalArgumentException iae) {
  log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker-common

CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
  configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
    if (!policies.isPresent()) {
      if (log.isDebugEnabled()) {

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode);
ownershipReadOnlyCache.invalidate(namespaceBundleZNode);
future.complete(new OwnedBundle(
    ServiceUnitZkUtils.suBundleFromPath(namespaceBundleZNode, bundleFactory)));

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

/**
 * As any broker, stop the load manager.
 * 
 * @throws PulsarServerException
 *             If an unexpected error occurred when attempting to stop the load manager.
 */
@Override
public void stop() throws PulsarServerException {
  availableActiveBrokers.close();
  brokerDataCache.close();
  brokerDataCache.clear();
  scheduler.shutdown();
}

相关文章

ZooKeeperDataCache类方法