org.apache.samza.zk.ZkUtils.getZkClient()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中org.apache.samza.zk.ZkUtils.getZkClient()方法的一些代码示例,展示了ZkUtils.getZkClient()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getZkClient()方法的具体详情如下:
包路径:org.apache.samza.zk.ZkUtils
类名称:ZkUtils
方法名:getZkClient

ZkUtils.getZkClient介绍

暂无

代码示例

代码示例来源:origin: apache/samza

@Override
 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);
 }
}

代码示例来源:origin: apache/samza

@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

@Override
 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);
 }
}

代码示例来源:origin: org.apache.samza/samza-core

@Override
 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");
 }
}

代码示例来源:origin: org.apache.samza/samza-core

@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

@Override
 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);
 }
}

代码示例来源:origin: apache/samza

/**
 * Expires the barrier version by marking it as TIMED_OUT
 *
 * @param version Version associated with the Barrier
 */
public void expire(String version) {
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);
 State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
 if (Objects.equals(barrierState, State.NEW)) {
  LOG.info(String.format("Expiring the barrier version: %s. Marking the barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT));
  zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
 } else {
  LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", version, barrierState, State.TIMED_OUT));
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Expires the barrier version by marking it as TIMED_OUT
 *
 * @param version Version associated with the Barrier
 */
public void expire(String version) {
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);
 State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
 if (Objects.equals(barrierState, State.NEW)) {
  LOG.info(String.format("Expiring the barrier version: %s. Marking the barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT));
  zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
 } else {
  LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", version, barrierState, State.TIMED_OUT));
 }
}

代码示例来源:origin: apache/samza

/**
 * Joins a shared barrier by registering under the barrier sub-tree in ZK
 *
 * @param version Version associated with the Barrier
 * @param participantId Identifier of the participant
 */
public void join(String version, String participantId) {
 LOG.info("Joining the barrier version: {} as participant: {}.", version, participantId);
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);
 LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
 zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
 // TODO: Handle ZkNodeExistsException - SAMZA-1304
 zkUtils.getZkClient().createPersistent(
   String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Joins a shared barrier by registering under the barrier sub-tree in ZK
 *
 * @param version Version associated with the Barrier
 * @param participantId Identifier of the participant
 */
public void join(String version, String participantId) {
 LOG.info("Joining the barrier version: {} as participant: {}.", version, participantId);
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);
 LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
 zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
 // TODO: Handle ZkNodeExistsException - SAMZA-1304
 zkUtils.getZkClient().createPersistent(
   String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

/**
 * Joins a shared barrier by registering under the barrier sub-tree in ZK
 *
 * @param version Version associated with the Barrier
 * @param participantId Identifier of the participant
 */
public void join(String version, String participantId) {
 LOG.info("Joining the barrier version: {} as participant: {}.", version, participantId);
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);
 LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
 zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
 // TODO: Handle ZkNodeExistsException - SAMZA-1304
 zkUtils.getZkClient().createPersistent(
   String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
}

代码示例来源:origin: apache/samza

@Test
public void testCleanUpZkBarrierVersion() {
 String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
 zkUtils.getZkClient().createPersistent(root, true);
 ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null);
 for (int i = 200; i < 210; i++) {
  barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
 }
 zkUtils.deleteOldBarrierVersions(5);
 List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
 Collections.sort(zNodeIds);
 Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
   zNodeIds);
}

代码示例来源:origin: apache/samza

@Test
public void testCleanUpZkJobModels() {
 String root = zkUtils.getKeyBuilder().getJobModelPathPrefix();
 System.out.println("root=" + root);
 zkUtils.getZkClient().createPersistent(root, true);
 // generate multiple version
 for (int i = 101; i < 110; i++) {
  zkUtils.publishJobModel(String.valueOf(i), null);
 }
 // clean all of the versions except 5 most recent ones
 zkUtils.deleteOldJobModels(5);
 Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root));
}

代码示例来源:origin: apache/samza

@After
public void testTeardown() {
 testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
 testZkUtils.close();
}

代码示例来源:origin: apache/samza

@After
public void testTeardown() {
 testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
 testZkUtils.close();
}

代码示例来源:origin: apache/samza

@Before
public void testSetup() {
 testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
 try {
  testZkUtils = getZkUtilsWithNewClient();
 } catch (Exception e) {
  Assert.fail("Client connection setup failed. Aborting tests..");
 }
 try {
  testZkUtils.getZkClient().createPersistent(KEY_BUILDER.getProcessorsPath(), true);
 } catch (ZkNodeExistsException e) {
  // Do nothing
 }
}
// used in the callbacks

相关文章