com.yahoo.pulsar.zookeeper.ZooKeeperDataCache.get()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(15.6k)|赞(0)|评价(0)|浏览(106)

本文整理了Java中com.yahoo.pulsar.zookeeper.ZooKeeperDataCache.get()方法的一些代码示例,展示了ZooKeeperDataCache.get()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperDataCache.get()方法的具体详情如下:
包路径:com.yahoo.pulsar.zookeeper.ZooKeeperDataCache
类名称:ZooKeeperDataCache
方法名:get

ZooKeeperDataCache.get介绍

暂无

代码示例

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private String getDynamicConfigurationFromZK(String zkPath, String settingName, String defaultValue) {
  try {
    return dynamicConfigurationCache.get(zkPath).map(c -> c.get(settingName)).orElse(defaultValue);
  } catch (Exception e) {
    log.warn("Got exception when reading ZooKeeper path [{}]:", zkPath, e);
    return defaultValue;
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-discovery-service

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
  List<LoadReport> availableBrokers = new ArrayList<>(brokerNodes.size());
  for (String broker : brokerNodes) {
    availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
  }
  this.availableBrokers = availableBrokers;
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

public BacklogQuota getBacklogQuota(String namespace, String policyPath) {
  try {
    Optional<Policies> policies = zkCache.get(policyPath);
    if (!policies.isPresent()) {
      return this.defaultQuota;
    }
    return policies.get().backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota);
  } catch (Exception e) {
    log.error(String.format("Failed to read policies data, will apply the default backlog quota: namespace=%s",
        namespace), e);
    return this.defaultQuota;
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private NamespaceIsolationPolicies getIsolationPolicies(String clusterName) {
  NamespaceIsolationPolicies policies = null;
  try {
    policies = namespaceIsolationPolicies
        .get(AdminResource.path("clusters", clusterName, "namespaceIsolationPolicies")).orElse(null);
  } catch (Exception e) {
    LOG.warn("GetIsolationPolicies: Unable to get the namespaceIsolationPolicies [{}]", e);
  }
  return policies;
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private ResourceQuota readQuotaFromZnode(String zpath) {
  try {
    return this.resourceQuotaCache.get(zpath).orElseGet(() -> new ResourceQuota());
  } catch (Exception e) {
    LOG.warn("Failed to read quota from znode {}: {}", zpath, e);
    return new ResourceQuota();
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@GET
@Path("/{property}")
@ApiOperation(value = "Get the admin configuration for a given property.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
    @ApiResponse(code = 404, message = "Property doesn't exist") })
public PropertyAdmin getPropertyAdmin(@PathParam("property") String property) {
  validateSuperUserAccess();
  try {
    return propertiesCache().get(path("policies", property))
        .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
  } catch (Exception e) {
    log.error("[{}] Failed to get property {}", clientAppId(), property, e);
    throw new RestException(e);
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private void updateAllBrokerData() {
  final Set<String> activeBrokers = getAvailableBrokers();
  final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
  for (String broker : activeBrokers) {
    try {
      String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
      final LocalBrokerData localData = brokerDataCache.get(key)
          .orElseThrow(KeeperException.NoNodeException::new);
      if (brokerDataMap.containsKey(broker)) {
        // Replace previous local broker data.
        brokerDataMap.get(broker).setLocalData(localData);
      } else {
        // Initialize BrokerData object for previously unseen
        // brokers.
        brokerDataMap.put(broker, new BrokerData(localData));
      }
    } catch (Exception e) {
      log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
    }
  }
  // Remove obsolete brokers.
  for (final String broker : brokerDataMap.keySet()) {
    if (!activeBrokers.contains(broker)) {
      brokerDataMap.remove(broker);
    }
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@GET
@Path("/{cluster}")
@ApiOperation(value = "Get the configuration data for the specified cluster.", response = ClusterData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
    @ApiResponse(code = 404, message = "Cluster doesn't exist") })
public ClusterData getCluster(@PathParam("cluster") String cluster) {
  validateSuperUserAccess();
  try {
    return clustersCache().get(path("clusters", cluster))
        .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
  } catch (Exception e) {
    log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
    if (e instanceof RestException) {
      throw (RestException) e;
    } else {
      throw new RestException(e);
    }
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private void validateClusterExists(String cluster) {
  try {
    if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
      throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
    }
  } catch (Exception e) {
    throw new RestException(e);
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@GET
@Path("/{cluster}/namespaceIsolationPolicies")
@ApiOperation(value = "Get the namespace isolation policies assigned in the cluster", response = NamespaceIsolationData.class, responseContainer = "Map")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
    @ApiResponse(code = 404, message = "Cluster doesn't exist") })
public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(@PathParam("cluster") String cluster)
    throws Exception {
  validateSuperUserAccess();
  if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
    throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
  }
  try {
    NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
        .get(path("clusters", cluster, "namespaceIsolationPolicies"))
        .orElseThrow(() -> new RestException(Status.NOT_FOUND,
            "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
    // construct the response to NamespaceisolationData map
    return nsIsolationPolicies.getPolicies();
  } catch (Exception e) {
    log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
    throw new RestException(e);
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@GET
@Path("/configuration/values")
@ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
@ApiResponses(value = { @ApiResponse(code = 404, message = "Configuration not found") })
public Map<String, String> getAllDynamicConfigurations() throws Exception {
  ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
      .getDynamicConfigurationCache();
  Map<String, String> configurationMap = null;
  try {
    configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
        .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
  } catch (RestException e) {
    LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
    throw e;
  } catch (Exception e) {
    LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
    throw new RestException(e);
  }
  return configurationMap;
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@GET
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(value = "Get a single namespace isolation policy assigned in the cluster", response = NamespaceIsolationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
    @ApiResponse(code = 404, message = "Policy doesn't exist"),
    @ApiResponse(code = 412, message = "Cluster doesn't exist") })
public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
    @PathParam("policyName") String policyName) throws Exception {
  validateSuperUserAccess();
  validateClusterExists(cluster);
  try {
    NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
        .get(path("clusters", cluster, "namespaceIsolationPolicies"))
        .orElseThrow(() -> new RestException(Status.NOT_FOUND,
            "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
    // construct the response to NamespaceisolationData map
    if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
      log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", policyName, cluster);
      throw new RestException(Status.NOT_FOUND,
          "Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
    }
    return nsIsolationPolicies.getPolicies().get(policyName);
  } catch (RestException re) {
    throw re;
  } catch (Exception e) {
    log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster, e);
    throw new RestException(e);
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

protected void validateClusterForProperty(String property, String cluster) {
  PropertyAdmin propertyAdmin;
  try {
    propertyAdmin = pulsar().getConfigurationCache().propertiesCache().get(path("policies", property))
        .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
  } catch (Exception e) {
    log.error("Failed to get property admin data for property");
    throw new RestException(e);
  }
  // Check if property is allowed on the cluster
  if (!propertyAdmin.getAllowedClusters().contains(cluster)) {
    String msg = String.format("Cluster [%s] is not in the list of allowed clusters list for property [%s]",
        cluster, property);
    log.info(msg);
    throw new RestException(Status.FORBIDDEN, msg);
  }
  log.info("Successfully validated clusters on property [{}]", property);
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private void updateRanking() {
  try {
    synchronized (currentLoadReports) {
      currentLoadReports.clear();
      Set<String> activeBrokers = availableActiveBrokers.get();
      for (String broker : activeBrokers) {
        try {
          String key = String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, broker);
          LoadReport lr = loadReportCacheZk.get(key)
              .orElseThrow(() -> new KeeperException.NoNodeException());
          ResourceUnit ru = new SimpleResourceUnit(String.format("http://%s", lr.getName()),
              fromLoadReport(lr));
          this.currentLoadReports.put(ru, lr);
        } catch (Exception e) {
          log.warn("Error reading load report from Cache for broker - [{}], [{}]", broker, e);
        }
      }
      updateRealtimeResourceQuota();
      doLoadRanking();
    }
  } catch (Exception e) {
    log.warn("Error reading active brokers list from zookeeper while re-ranking load reports [{}]", e);
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

/**
 * update new bundle-range to LocalZk (create a new node if not present)
 *
 * @param nsname
 * @param nsBundles
 * @param callback
 * @throws Exception
 */
private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, StatCallback callback)
    throws Exception {
  checkNotNull(nsname);
  checkNotNull(nsBundles);
  String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
  Optional<LocalPolicies> policies = pulsar.getLocalZkCacheService().policiesCache().get(path);
  if (!policies.isPresent()) {
    // if policies is not present into localZk then create new policies
    this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
    policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
  }
  policies.get().bundles = getBundlesData(nsBundles);
  this.pulsar.getLocalZkCache().getZooKeeper().setData(path,
      ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1, callback, null);
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception {
  String localCluster = pulsar.getConfiguration().getClusterName();
  return pulsar.getConfigurationCache().namespaceIsolationPoliciesCache()
      .get(AdminResource.path("clusters", localCluster, "namespaceIsolationPolicies")).orElseGet(() -> {
        // the namespace isolation policies are empty/undefined = an empty object
        return new NamespaceIsolationPolicies();
      });
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

public void startReplProducers() {
  // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close
  try {
    Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
        .get(AdminResource.path("policies", DestinationName.get(topic).getNamespace()))
        .orElseThrow(() -> new KeeperException.NoNodeException());
    if (policies.replication_clusters != null) {
      Set<String> configuredClusters = Sets.newTreeSet(policies.replication_clusters);
      replicators.forEach((region, replicator) -> {
        if (configuredClusters.contains(region)) {
          replicator.startProducer();
        }
      });
    }
  } catch (Exception e) {
    if (log.isDebugEnabled()) {
      log.debug("[{}] Error getting policies while starting repl-producers {}", topic, e.getMessage());
    }
    replicators.forEach((region, replicator) -> replicator.startProducer());
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

@Override
public void checkMessageExpiry() {
  DestinationName name = DestinationName.get(topic);
  Policies policies;
  try {
    policies = brokerService.pulsar().getConfigurationCache().policiesCache()
        .get(AdminResource.path("policies", name.getNamespace()))
        .orElseThrow(() -> new KeeperException.NoNodeException());
    if (policies.message_ttl_in_seconds != 0) {
      subscriptions.forEach((subName, sub) -> sub.expireMessages(policies.message_ttl_in_seconds));
      replicators.forEach((region, replicator) -> replicator.expireMessages(policies.message_ttl_in_seconds));
    }
  } catch (Exception e) {
    if (log.isDebugEnabled()) {
      log.debug("[{}] Error getting policies", topic);
    }
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
  if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
    log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
        (isClientAuthenticated(clientAppId)), clientAppId);
    if (!isClientAuthenticated(clientAppId)) {
      throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
    }
    if (pulsar.getConfiguration().getSuperUserRoles().contains(clientAppId)) {
      // Super-user has access to configure all the policies
      log.debug("granting access to super-user {} on property {}", clientAppId, property);
    } else {
      PropertyAdmin propertyAdmin;
      try {
        propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path("policies", property))
            .orElseThrow(() -> new RestException(Status.UNAUTHORIZED, "Property does not exist"));
      } catch (KeeperException.NoNodeException e) {
        log.warn("Failed to get property admin data for non existing property {}", property);
        throw new RestException(Status.UNAUTHORIZED, "Property does not exist");
      }
      if (!propertyAdmin.getAdminRoles().contains(clientAppId)) {
        throw new RestException(Status.UNAUTHORIZED,
            "Don't have permission to administrate resources on this property");
      }
      log.debug("Successfully authorized {} on property {}", clientAppId, property);
    }
  }
}

代码示例来源:origin: com.yahoo.pulsar/pulsar-broker

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
  try {
    Policies policies = policiesCache().get(AdminResource.path("policies", property, cluster, namespace))
        .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
    // fetch bundles from LocalZK-policies
    NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
        .getBundles(new NamespaceName(property, cluster, namespace));
    BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
    policies.bundles = bundleData != null ? bundleData : policies.bundles;
    return policies;
  } catch (RestException re) {
    throw re;
  } catch (Exception e) {
    log.error("[{}] Failed to get namespace policies {}/{}/{}", clientAppId(), property, cluster, namespace, e);
    throw new RestException(e);
  }
}

相关文章

ZooKeeperDataCache类方法