com.taobao.metamorphosis.utils.ZkUtils类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(77)

本文整理了Java中com.taobao.metamorphosis.utils.ZkUtils类的一些代码示例,展示了ZkUtils类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils类的具体详情如下:
包路径:com.taobao.metamorphosis.utils.ZkUtils
类名称:ZkUtils

ZkUtils介绍

[英]zkĹ
[中]zkĹ

代码示例

代码示例来源:origin: killme2008/Metamorphosis

final List<String> brokers = ZkUtils.getChildren(this.zkClient, this.brokerIdsPath + "/" + brokerId);
if (brokers == null) {
  return ret;
        ZkUtils.readDataMaybeNull(this.zkClient, this.brokerIdsPath + "/" + brokerId + "/" + broker);
    if (StringUtils.isNotBlank(brokerData)) {
      ret.add(new Broker(brokerId, brokerData + "?slaveId=" + slaveId));

代码示例来源:origin: killme2008/Metamorphosis

for (final String topic : topics) {
  List<String> partList = null;
  final List<String> brokers = ZkUtils.getChildren(this.zkClient, this.brokerTopicsSubPath + "/" + topic);
  for (final String broker : brokers) {
    final String[] tmp = StringUtils.split(broker, "-");
    if (tmp != null && tmp.length == 2) {
      String path = this.brokerTopicsSubPath + "/" + topic + "/" + broker;
      String brokerData = ZkUtils.readData(this.zkClient, path);
      try {
        final TopicBroker topicBroker = TopicBroker.parse(brokerData);

代码示例来源:origin: killme2008/Metamorphosis

throws Exception {
try {
  createEphemeralPath(client, path, data);
    storedData = readData(client, path);

代码示例来源:origin: killme2008/Metamorphosis

/**
 * zkѯmaster broker,򷵻null
 * */
public Broker getMasterBrokerById(final int brokerId) {
  final String brokersString = ZkUtils.readDataMaybeNull(this.zkClient, this.brokerIdsPathOf(brokerId, -1));
  if (StringUtils.isNotBlank(brokersString)) {
    return new Broker(brokerId, brokersString);
  }
  return null;
}

代码示例来源:origin: killme2008/Metamorphosis

/**
 * һbrokertopics
 * 
 * */
public Set<String> getTopicsByBrokerIdFromMaster(final int brokerId) {
  final Set<String> set = new HashSet<String>();
  final List<String> allTopics = ZkUtils.getChildren(this.zkClient, this.brokerTopicsSubPath);
  for (final String topic : allTopics) {
    final List<String> brokers = ZkUtils.getChildren(this.zkClient, this.brokerTopicsSubPath + "/" + topic);
    if (brokers != null && brokers.size() > 0) {
      for (final String broker : brokers) {
        if ((String.valueOf(brokerId) + "-m").equals(broker)) {
          set.add(topic);
        }
      }
    }
  }
  return set;
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-tools

public void setOffset(String topic, String group, Partition partition, String srcOffset) throws Exception {
    String path = query.getOffsetPath(group, topic, partition);
    ZkUtils.updatePersistentPath(this.query.getZkClient(), path, srcOffset);
  }
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

/**
   * consumerIdȡĵtopicб
   * 
   * @param consumerId
   * @return
   * @throws Exception
   */
  protected List<String> getTopics(final String consumerId) throws Exception {
    final String topicsString =
        ZkUtils.readData(ConsumerZooKeeper.this.zkClient, this.dirs.consumerRegistryDir + "/" + consumerId);
    if (StringUtils.isBlank(topicsString)) {
      return Collections.emptyList();
    }
    final String[] topics = topicsString.split(",");
    final List<String> rt = new ArrayList<String>(topics.length);
    for (final String topic : topics) {
      rt.add(topic);
    }
    return rt;
  }
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

for (int i = 0; i < MAX_N_RETRIES; i++) {
  ZkUtils.makeSurePersistentPathExists(this.zkClient, dirs.consumerRegistryDir);
  ZkUtils.createEphemeralPathExpectConflict(this.zkClient, dirs.consumerRegistryDir + "/"
      + loadBalanceListener.consumerIdString, topicString);
    ZkUtils.makeSurePersistentPathExists(this.zkClient, partitionPath);
    this.zkClient.subscribeChildChanges(partitionPath, loadBalanceListener);

代码示例来源:origin: killme2008/Metamorphosis

/**
 * Create an ephemeral node with the given path and data. Create parents if
 * necessary.
 */
public static void createEphemeralPath(final ZkClient client, final String path, final String data)
    throws Exception {
  try {
    client.createEphemeral(path, data);
  }
  catch (final ZkNoNodeException e) {
    createParentPath(client, path);
    client.createEphemeral(path, data);
  }
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

private void deleteOwnership(final String znode) {
  try {
    ZkUtils.deletePath(ConsumerZooKeeper.this.zkClient, znode);
  }
  catch (final Throwable t) {
    log.error("exception during releasePartitionOwnership", t);
  }
  if (log.isDebugEnabled()) {
    log.debug("Consumer " + this.consumerIdString + " releasing " + znode);
  }
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

/**
 * ӷownerϵ
 * 
 * @param topicDirs
 * @param partition
 * @param topic
 * @param consumerThreadId
 * @return
 */
protected boolean ownPartition(final ZKGroupTopicDirs topicDirs, final String partition, final String topic,
    final String consumerThreadId) throws Exception {
  final String partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition;
  try {
    ZkUtils.createEphemeralPathExpectConflict(ConsumerZooKeeper.this.zkClient, partitionOwnerPath,
      consumerThreadId);
  }
  catch (final ZkNodeExistsException e) {
    // ԭʼĹϵӦѾɾԺ
    log.info("waiting for the partition ownership to be deleted: " + partition);
    return false;
  }
  catch (final Exception e) {
    throw e;
  }
  this.addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId);
  return true;
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

private void publishTopicInternal(final String topic, final BrokerConnectionListener listener) throws Exception,
NotifyRemotingException, InterruptedException {
  final String partitionPath = this.metaZookeeper.brokerTopicsPubPath + "/" + topic;
  ZkUtils.makeSurePersistentPathExists(ProducerZooKeeper.this.zkClient, partitionPath);
  ProducerZooKeeper.this.zkClient.subscribeChildChanges(partitionPath, listener);
  // һҪͬȴ
  listener.syncedUpdateBrokersInfo();
}

代码示例来源:origin: killme2008/Metamorphosis

List<Partition> partList = null;
final String dataString =
    ZkUtils.readDataMaybeNull(this.zkClient, this.brokerTopicsPathOf(topic, false, brokerId, -1));
if (StringUtils.isBlank(dataString)) {
  continue;

代码示例来源:origin: killme2008/Metamorphosis

/**
 * طָtopicmaster brokers
 * */
public Map<Integer, String> getMasterBrokersByTopic(final String topic) {
  final Map<Integer, String> ret = new TreeMap<Integer, String>();
  final List<String> brokerIds = ZkUtils.getChildren(this.zkClient, this.brokerTopicsPubPath + "/" + topic);
  if (brokerIds == null) {
    return ret;
  }
  for (final String brokerIdStr : brokerIds) {
    if (!brokerIdStr.endsWith("-m")) {
      continue;
    }
    final int brokerId = Integer.parseInt(StringUtils.split(brokerIdStr, "-")[0]);
    final Broker broker = this.getMasterBrokerById(brokerId);
    if (broker != null) {
      ret.put(brokerId, broker.getZKString());
    }
  }
  return ret;
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

ZkUtils.updatePersistentPath(this.zkClient, topicDirs.consumerOffsetDir + "/"
    + info.getPartition().toString(), msgId + "-" + newOffset);

代码示例来源:origin: killme2008/Metamorphosis

/**
 * Update the value of a persistent node with the given path and data.
 * create parrent directory if necessary. Never throw NodeExistException.
 */
public static void updatePersistentPath(final ZkClient client, final String path, final String data)
    throws Exception {
  try {
    client.writeData(path, data);
  }
  catch (final ZkNoNodeException e) {
    createParentPath(client, path);
    client.createPersistent(path, data);
  }
  catch (final Exception e) {
    throw e;
  }
}

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

ZkUtils.deletePath(this.zkClient, listener.dirs.consumerRegistryDir + "/"
    + listener.consumerIdString);

代码示例来源:origin: com.taobao.metamorphosis/metamorphosis-client

@Override
public TopicPartitionRegInfo load(final String topic, final String group, final Partition partition) {
  final ZKGroupTopicDirs topicDirs = this.metaZookeeper.new ZKGroupTopicDirs(topic, group);
  final String znode = topicDirs.consumerOffsetDir + "/" + partition.toString();
  final String offsetString = ZkUtils.readDataMaybeNull(this.zkClient, znode);
  if (offsetString == null) {
    return null;
  }
  else {
    // Ͽͻ
    final int index = offsetString.lastIndexOf("-");
    if (index > 0) {
      // 1.4ʼ¿ͻ
      final long msgId = Long.parseLong(offsetString.substring(0, index));
      final long offset = Long.parseLong(offsetString.substring(index + 1));
      return new TopicPartitionRegInfo(topic, partition, offset, msgId);
    }
    else {
      // Ͽͻ
      final long offset = Long.parseLong(offsetString);
      return new TopicPartitionRegInfo(topic, partition, offset);
    }
  }
}

代码示例来源:origin: killme2008/Metamorphosis

for (final String topic : topics) {
  List<Partition> partList = null;
  final List<String> brokers = ZkUtils.getChildren(this.zkClient, this.brokerTopicsPubPath + "/" + topic);
  for (final String broker : brokers) {
    final String[] brokerStrs = StringUtils.split(broker, "-");
    if (this.isMaster(brokerStrs)) {
      String path = this.brokerTopicsPubPath + "/" + topic + "/" + broker;
      String brokerData = ZkUtils.readData(this.zkClient, path);
      try {
        final TopicBroker topicBroker = TopicBroker.parse(brokerData);

代码示例来源:origin: killme2008/Metamorphosis

/**
 * brokerȺ,slavemaster
 * 
 * @param zkClient
 * @return
 */
public Cluster getCluster() {
  final Cluster cluster = new Cluster();
  final List<String> nodes = ZkUtils.getChildren(this.zkClient, this.brokerIdsPath);
  for (final String node : nodes) {
    // String brokerZKString = readData(zkClient, brokerIdsPath + "/" +
    // node);
    final int brokerId = Integer.parseInt(node);
    final Set<Broker> brokers = this.getBrokersById(brokerId);
    if (brokers != null && !brokers.isEmpty()) {
      cluster.addBroker(brokerId, brokers);
    }
  }
  return cluster;
}

相关文章