本文整理了Java中com.yahoo.pulsar.zookeeper.ZooKeeperDataCache
类的一些代码示例,展示了ZooKeeperDataCache
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperDataCache
类的具体详情如下:
包路径:com.yahoo.pulsar.zookeeper.ZooKeeperDataCache
类名称:ZooKeeperDataCache
暂无
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private String getDynamicConfigurationFromZK(String zkPath, String settingName, String defaultValue) {
try {
return dynamicConfigurationCache.get(zkPath).map(c -> c.get(settingName)).orElse(defaultValue);
} catch (Exception e) {
log.warn("Got exception when reading ZooKeeper path [{}]:", zkPath, e);
return defaultValue;
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@Override
public CompletableFuture<Optional<LocalPolicies>> getAsync(String path) {
CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<>();
// First check in local-zk cache
super.getAsync(path).thenAccept(localPolicies -> {
if (localPolicies.isPresent()) {
future.complete(localPolicies);
} else {
// create new policies node under Local ZK by coping it from Global ZK
createPolicies(path, true).thenAccept(p -> {
LOG.info("Successfully created local policies for {} -- {}", path, p);
future.complete(p);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}
};
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@Override
public void stop() throws PulsarServerException {
loadReportCacheZk.close();
loadReportCacheZk.clear();
availableActiveBrokers.close();
scheduler.shutdown();
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
.get(nsIsolationPolicyPath).orElseGet(() -> {
try {
this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
-1);
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
} catch (KeeperException.NoNodeException nne) {
log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
policiesNode = policiesCache().getWithStat(path("policies", property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist."));
ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluser " + replCluster + " does not exist"));
globalZk().setData(path("policies", property, cluster, namespace), jsonMapper().writeValueAsBytes(policies),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path("policies", property, cluster, namespace));
} catch (Exception e) {
log.error("[{}] Failed to delete namespace on global ZK {}/{}/{}", clientAppId(), property, cluster,
globalZk().delete(globalZkPolicyPath, -1);
localZk().delete(lcaolZkPolicyPath, -1);
policiesCache().invalidate(globalZkPolicyPath);
localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
} catch (PulsarAdminException cae) {
throw new RestException(cae);
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
Optional<Map<String, String>> data = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
if (data.isPresent() && data.get() != null) {
data.get().forEach((key, value) -> {
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
@SuppressWarnings("unchecked")
@Override
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
this.bundlesCache = Caffeine.newBuilder().buildAsync((NamespaceName namespace, Executor executor) -> {
String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Loading cache with bundles for {}", namespace);
}
if (pulsar == null || pulsar.getConfigurationCache() == null) {
return CompletableFuture.completedFuture(getBundles(namespace, null));
}
CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
// Read the static bundle data from the policies
pulsar.getLocalZkCacheService().policiesCache().getAsync(path).thenAccept(policies -> {
// If no policies defined for namespace, assume 1 single bundle
BundlesData bundlesData = policies.map(p -> p.bundles).orElse(null);
NamespaceBundles namespaceBundles = getBundles(namespace, bundlesData);
future.complete(namespaceBundles);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
});
if (pulsar != null && pulsar.getConfigurationCache() != null) {
pulsar.getLocalZkCacheService().policiesCache().registerListener(this);
}
this.pulsar = pulsar;
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
/**
* Set resource quota for a specified <code>ServiceUnit</code>.
*
* @param suName
* identifier of the <code>ServiceUnit</code>
* @param quota
* <code>ResourceQuota</code> to set.
*/
public void setQuota(String suName, ResourceQuota quota) throws Exception {
String zpath = ResourceQuotaCache.path(suName);
this.resourceQuotaCache.invalidate(zpath);
this.saveQuotaToZnode(zpath, quota);
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
policiesNode = policiesCache().getWithStat(path("policies", property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().message_ttl_in_seconds = messageTTL;
policiesCache().invalidate(path("policies", property, cluster, namespace));
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
/**
* Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate
* action if any specific config-field value has been changed.
* </p>
* On notification, listener should first check if config value has been changed and after taking appropriate
* action, listener should update config value with new value if it has been changed (so, next time listener can
* compare values on configMap change).
* @param <T>
*
* @param configKey
* : configuration field name
* @param listener
* : listener which takes appropriate action on config-value change
*/
public <T> void registerConfigurationListener(String configKey, Consumer<T> listener) {
configRegisteredListeners.put(configKey, listener);
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
@SuppressWarnings("unchecked")
@Override
public void onUpdate(String path, Map<String, String> data, Stat stat) {
if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null
&& data.containsKey(configKey)) {
log.info("Updating configuration {}/{}", configKey, data.get(configKey));
listener.accept((T) FieldParser.value(data.get(configKey), dynamicConfigurationMap.get(configKey)));
}
}
});
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@Override
public void close() throws IOException {
log.info("Shutting down Pulsar Broker service");
if (pulsar.getConfigurationCache() != null) {
pulsar.getConfigurationCache().policiesCache().unregisterListener(this);
}
// unloads all namespaces gracefully without disrupting mutually
unloadNamespaceBundlesGracefully();
// close replication clients
replicationClients.forEach((cluster, client) -> {
try {
client.shutdown();
} catch (PulsarClientException e) {
log.warn("Error shutting down repl client for cluster {}", cluster, e);
}
});
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
statsUpdater.shutdown();
inactivityMonitor.shutdown();
messageExpiryMonitor.shutdown();
backlogQuotaChecker.shutdown();
authenticationService.close();
pulsarStats.close();
log.info("Broker service completely shut down");
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElse(null);
if (configurationMap != null) {
configurationMap.put(configName, configValue);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
serviceConfigZkVersion = localZk()
.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
/**
* Disable bundle in local cache and on zk
*
* @param bundle
* @throws Exception
*/
public void disableOwnership(NamespaceBundle bundle) throws Exception {
String path = ServiceUnitZkUtils.path(bundle);
updateBundleState(bundle, false);
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
ownershipReadOnlyCache.invalidate(path);
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
policiesNode = policiesCache().getWithStat(path("policies", property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().replication_clusters = clusterIds;
policiesCache().invalidate(path("policies", property, cluster, namespace));
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
pulsar.getConfigurationCache().policiesCache().registerListener(this);
代码示例来源:origin: com.yahoo.pulsar/pulsar-discovery-service
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
List<LoadReport> availableBrokers = new ArrayList<>(brokerNodes.size());
for (String broker : brokerNodes) {
availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
}
this.availableBrokers = availableBrokers;
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
.get(nsIsolationPolicyPath).orElseGet(() -> {
try {
this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
-1);
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
} catch (IllegalArgumentException iae) {
log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker-common
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode);
ownershipReadOnlyCache.invalidate(namespaceBundleZNode);
future.complete(new OwnedBundle(
ServiceUnitZkUtils.suBundleFromPath(namespaceBundleZNode, bundleFactory)));
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
/**
* As any broker, stop the load manager.
*
* @throws PulsarServerException
* If an unexpected error occurred when attempting to stop the load manager.
*/
@Override
public void stop() throws PulsarServerException {
availableActiveBrokers.close();
brokerDataCache.close();
brokerDataCache.clear();
scheduler.shutdown();
}
内容来源于网络,如有侵权,请联系作者删除!