kafka.utils.ZkUtils.updatePersistentPath()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(3.8k)|赞(0)|评价(0)|浏览(128)

本文整理了Java中kafka.utils.ZkUtils.updatePersistentPath()方法的一些代码示例,展示了ZkUtils.updatePersistentPath()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.updatePersistentPath()方法的具体详情如下:
包路径:kafka.utils.ZkUtils
类名称:ZkUtils
方法名:updatePersistentPath

ZkUtils.updatePersistentPath介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

/**
 * @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
 * @param groupID consumer group to update
 * @param offsets mapping of (topic and) partition to offset to push to Zookeeper
 */
public static void setOffsets(String zkServers,
               String groupID,
               Map<Pair<String,Integer>,Long> offsets) {
 ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
 try {
  offsets.forEach((topicAndPartition, offset) -> {
   String topic = topicAndPartition.getFirst();
   int partition = topicAndPartition.getSecond();
   String partitionOffsetPath = "/consumers/" + groupID + "/offsets/" + topic + "/" + partition;
   zkUtils.updatePersistentPath(partitionOffsetPath,
                  Long.toString(offset),
                  ZkUtils$.MODULE$.defaultAcls(false, ""));
  });
 } finally {
  zkUtils.close();
 }
}

代码示例来源:origin: uber/chaperone

private static void putOffsetInfoIntoZk(String groupId, Map<String, Map<Integer, Long>> topicOffsetsMap) {
 ZkUtils zkUtils =
   ZkUtils.apply(AuditConfig.INGESTER_ZK_CONNECT, Integer.valueOf(AuditConfig.INGESTER_ZK_SESSION_TIMEOUT_MS),
     Integer.valueOf(AuditConfig.INGESTER_ZK_SESSION_TIMEOUT_MS), false);
 try {
  for (Map.Entry<String, Map<Integer, Long>> topicEntry : topicOffsetsMap.entrySet()) {
   String zkPath = String.format("%s/%s/offsets/%s/", ZkUtils.ConsumersPath(), groupId, topicEntry.getKey());
   for (Map.Entry<Integer, Long> offsetEntry : topicEntry.getValue().entrySet()) {
    logger.info("Put offset={} to partition={} with znode path={}", offsetEntry.getValue(), offsetEntry.getKey(),
      zkPath + offsetEntry.getKey());
    zkUtils.updatePersistentPath(zkPath + offsetEntry.getKey(), offsetEntry.getValue().toString(),
      zkUtils.DefaultAcls());
   }
  }
 } catch (Exception e) {
  logger.error("Got exception to put offset, with zkPathPrefix={}",
    String.format("%s/%s/offsets", ZkUtils.ConsumersPath(), groupId));
  throw e;
 } finally {
  zkUtils.close();
 }
}

代码示例来源:origin: org.opendaylight.centinel/centinel-laas

/**
 * @param zookeeperHosts
 *            Zookeeper hosts e.g. localhost:2181. If multiple zookeeper
 *            then host1:port1[,host2:port2,...]
 * @param groupID
 *            consumer group to update
 * @param offsets
 *            mapping of (topic and) partition to offset to push to
 *            Zookeeper
 */
public void createOffsets(String zookeeperHosts, String groupID, Map<TopicAndPartition, Long> offsets) {
  try (SuperZkClient zkClient = new SuperZkClient(zookeeperHosts)) {
    for (Map.Entry<TopicAndPartition, Long> entry : offsets.entrySet()) {
      TopicAndPartition topicAndPartition = entry.getKey();
      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
      int partition = topicAndPartition.partition();
      long offset = entry.getValue();
      String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
      ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
    }
  }
}

代码示例来源:origin: com.cloudera.oryx/kafka-util

/**
 * @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
 * @param groupID consumer group to update
 * @param offsets mapping of (topic and) partition to offset to push to Zookeeper
 */
public static void setOffsets(String zkServers,
               String groupID,
               Map<Pair<String,Integer>,Long> offsets) {
 ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
 try {
  offsets.forEach((topicAndPartition, offset) -> {
   String topic = topicAndPartition.getFirst();
   int partition = topicAndPartition.getSecond();
   String partitionOffsetPath = "/consumers/" + groupID + "/offsets/" + topic + "/" + partition;
   zkUtils.updatePersistentPath(partitionOffsetPath,
                  Long.toString(offset),
                  ZkUtils$.MODULE$.defaultAcls(false, ""));
  });
 } finally {
  zkUtils.close();
 }
}

相关文章