本文整理了Java中com.yahoo.pulsar.zookeeper.ZooKeeperDataCache.get()
方法的一些代码示例,展示了ZooKeeperDataCache.get()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperDataCache.get()
方法的具体详情如下:
包路径:com.yahoo.pulsar.zookeeper.ZooKeeperDataCache
类名称:ZooKeeperDataCache
方法名:get
暂无
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private String getDynamicConfigurationFromZK(String zkPath, String settingName, String defaultValue) {
try {
return dynamicConfigurationCache.get(zkPath).map(c -> c.get(settingName)).orElse(defaultValue);
} catch (Exception e) {
log.warn("Got exception when reading ZooKeeper path [{}]:", zkPath, e);
return defaultValue;
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-discovery-service
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
List<LoadReport> availableBrokers = new ArrayList<>(brokerNodes.size());
for (String broker : brokerNodes) {
availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
}
this.availableBrokers = availableBrokers;
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
public BacklogQuota getBacklogQuota(String namespace, String policyPath) {
try {
Optional<Policies> policies = zkCache.get(policyPath);
if (!policies.isPresent()) {
return this.defaultQuota;
}
return policies.get().backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota);
} catch (Exception e) {
log.error(String.format("Failed to read policies data, will apply the default backlog quota: namespace=%s",
namespace), e);
return this.defaultQuota;
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private NamespaceIsolationPolicies getIsolationPolicies(String clusterName) {
NamespaceIsolationPolicies policies = null;
try {
policies = namespaceIsolationPolicies
.get(AdminResource.path("clusters", clusterName, "namespaceIsolationPolicies")).orElse(null);
} catch (Exception e) {
LOG.warn("GetIsolationPolicies: Unable to get the namespaceIsolationPolicies [{}]", e);
}
return policies;
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private ResourceQuota readQuotaFromZnode(String zpath) {
try {
return this.resourceQuotaCache.get(zpath).orElseGet(() -> new ResourceQuota());
} catch (Exception e) {
LOG.warn("Failed to read quota from znode {}: {}", zpath, e);
return new ResourceQuota();
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@GET
@Path("/{property}")
@ApiOperation(value = "Get the admin configuration for a given property.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property doesn't exist") })
public PropertyAdmin getPropertyAdmin(@PathParam("property") String property) {
validateSuperUserAccess();
try {
return propertiesCache().get(path("policies", property))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to get property {}", clientAppId(), property, e);
throw new RestException(e);
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private void updateAllBrokerData() {
final Set<String> activeBrokers = getAvailableBrokers();
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
for (String broker : activeBrokers) {
try {
String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
final LocalBrokerData localData = brokerDataCache.get(key)
.orElseThrow(KeeperException.NoNodeException::new);
if (brokerDataMap.containsKey(broker)) {
// Replace previous local broker data.
brokerDataMap.get(broker).setLocalData(localData);
} else {
// Initialize BrokerData object for previously unseen
// brokers.
brokerDataMap.put(broker, new BrokerData(localData));
}
} catch (Exception e) {
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
}
}
// Remove obsolete brokers.
for (final String broker : brokerDataMap.keySet()) {
if (!activeBrokers.contains(broker)) {
brokerDataMap.remove(broker);
}
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@GET
@Path("/{cluster}")
@ApiOperation(value = "Get the configuration data for the specified cluster.", response = ClusterData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist") })
public ClusterData getCluster(@PathParam("cluster") String cluster) {
validateSuperUserAccess();
try {
return clustersCache().get(path("clusters", cluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
if (e instanceof RestException) {
throw (RestException) e;
} else {
throw new RestException(e);
}
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private void validateClusterExists(String cluster) {
try {
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
}
} catch (Exception e) {
throw new RestException(e);
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@GET
@Path("/{cluster}/namespaceIsolationPolicies")
@ApiOperation(value = "Get the namespace isolation policies assigned in the cluster", response = NamespaceIsolationData.class, responseContainer = "Map")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist") })
public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(@PathParam("cluster") String cluster)
throws Exception {
validateSuperUserAccess();
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
}
try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(path("clusters", cluster, "namespaceIsolationPolicies"))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to NamespaceisolationData map
return nsIsolationPolicies.getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
throw new RestException(e);
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@GET
@Path("/configuration/values")
@ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
@ApiResponses(value = { @ApiResponse(code = 404, message = "Configuration not found") })
public Map<String, String> getAllDynamicConfigurations() throws Exception {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = null;
try {
configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
} catch (RestException e) {
LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
throw e;
} catch (Exception e) {
LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
return configurationMap;
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@GET
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(value = "Get a single namespace isolation policy assigned in the cluster", response = NamespaceIsolationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Policy doesn't exist"),
@ApiResponse(code = 412, message = "Cluster doesn't exist") })
public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
@PathParam("policyName") String policyName) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);
try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(path("clusters", cluster, "namespaceIsolationPolicies"))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to NamespaceisolationData map
if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", policyName, cluster);
throw new RestException(Status.NOT_FOUND,
"Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
}
return nsIsolationPolicies.getPolicies().get(policyName);
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
protected void validateClusterForProperty(String property, String cluster) {
PropertyAdmin propertyAdmin;
try {
propertyAdmin = pulsar().getConfigurationCache().propertiesCache().get(path("policies", property))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
} catch (Exception e) {
log.error("Failed to get property admin data for property");
throw new RestException(e);
}
// Check if property is allowed on the cluster
if (!propertyAdmin.getAllowedClusters().contains(cluster)) {
String msg = String.format("Cluster [%s] is not in the list of allowed clusters list for property [%s]",
cluster, property);
log.info(msg);
throw new RestException(Status.FORBIDDEN, msg);
}
log.info("Successfully validated clusters on property [{}]", property);
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private void updateRanking() {
try {
synchronized (currentLoadReports) {
currentLoadReports.clear();
Set<String> activeBrokers = availableActiveBrokers.get();
for (String broker : activeBrokers) {
try {
String key = String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, broker);
LoadReport lr = loadReportCacheZk.get(key)
.orElseThrow(() -> new KeeperException.NoNodeException());
ResourceUnit ru = new SimpleResourceUnit(String.format("http://%s", lr.getName()),
fromLoadReport(lr));
this.currentLoadReports.put(ru, lr);
} catch (Exception e) {
log.warn("Error reading load report from Cache for broker - [{}], [{}]", broker, e);
}
}
updateRealtimeResourceQuota();
doLoadRanking();
}
} catch (Exception e) {
log.warn("Error reading active brokers list from zookeeper while re-ranking load reports [{}]", e);
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
/**
* update new bundle-range to LocalZk (create a new node if not present)
*
* @param nsname
* @param nsBundles
* @param callback
* @throws Exception
*/
private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, StatCallback callback)
throws Exception {
checkNotNull(nsname);
checkNotNull(nsBundles);
String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
Optional<LocalPolicies> policies = pulsar.getLocalZkCacheService().policiesCache().get(path);
if (!policies.isPresent()) {
// if policies is not present into localZk then create new policies
this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
}
policies.get().bundles = getBundlesData(nsBundles);
this.pulsar.getLocalZkCache().getZooKeeper().setData(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1, callback, null);
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception {
String localCluster = pulsar.getConfiguration().getClusterName();
return pulsar.getConfigurationCache().namespaceIsolationPoliciesCache()
.get(AdminResource.path("clusters", localCluster, "namespaceIsolationPolicies")).orElseGet(() -> {
// the namespace isolation policies are empty/undefined = an empty object
return new NamespaceIsolationPolicies();
});
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
public void startReplProducers() {
// read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path("policies", DestinationName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
if (policies.replication_clusters != null) {
Set<String> configuredClusters = Sets.newTreeSet(policies.replication_clusters);
replicators.forEach((region, replicator) -> {
if (configuredClusters.contains(region)) {
replicator.startProducer();
}
});
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies while starting repl-producers {}", topic, e.getMessage());
}
replicators.forEach((region, replicator) -> replicator.startProducer());
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
@Override
public void checkMessageExpiry() {
DestinationName name = DestinationName.get(topic);
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path("policies", name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
if (policies.message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) -> sub.expireMessages(policies.message_ttl_in_seconds));
replicators.forEach((region, replicator) -> replicator.expireMessages(policies.message_ttl_in_seconds));
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
(isClientAuthenticated(clientAppId)), clientAppId);
if (!isClientAuthenticated(clientAppId)) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
if (pulsar.getConfiguration().getSuperUserRoles().contains(clientAppId)) {
// Super-user has access to configure all the policies
log.debug("granting access to super-user {} on property {}", clientAppId, property);
} else {
PropertyAdmin propertyAdmin;
try {
propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path("policies", property))
.orElseThrow(() -> new RestException(Status.UNAUTHORIZED, "Property does not exist"));
} catch (KeeperException.NoNodeException e) {
log.warn("Failed to get property admin data for non existing property {}", property);
throw new RestException(Status.UNAUTHORIZED, "Property does not exist");
}
if (!propertyAdmin.getAdminRoles().contains(clientAppId)) {
throw new RestException(Status.UNAUTHORIZED,
"Don't have permission to administrate resources on this property");
}
log.debug("Successfully authorized {} on property {}", clientAppId, property);
}
}
}
代码示例来源:origin: com.yahoo.pulsar/pulsar-broker
protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
try {
Policies policies = policiesCache().get(AdminResource.path("policies", property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(new NamespaceName(property, cluster, namespace));
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}/{}/{}", clientAppId(), property, cluster, namespace, e);
throw new RestException(e);
}
}
内容来源于网络,如有侵权,请联系作者删除!