本文整理了Java中org.apache.samza.zk.ZkUtils.getZkClient()
方法的一些代码示例,展示了ZkUtils.getZkClient()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getZkClient()
方法的具体详情如下:
包路径:org.apache.samza.zk.ZkUtils
类名称:ZkUtils
方法名:getZkClient
暂无
代码示例来源:origin: apache/samza
@Override
public void countDown() {
// create persistent node
String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
LOG.debug("ZKProcessorLatch countDown created " + path);
}
}
代码示例来源:origin: apache/samza
@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
// waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
// the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
if (!targetPathExists) {
throw new TimeoutException("Timed out waiting for the targetPath");
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
// waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
// the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
if (!targetPathExists) {
throw new TimeoutException("Timed out waiting for the targetPath");
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
// waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
// the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
if (!targetPathExists) {
throw new TimeoutException("Timed out waiting for the targetPath");
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
@Override
public void countDown() {
// create persistent node
String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
LOG.debug("ZKProcessorLatch countDown created " + path);
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
@Override
public void countDown() {
// create persistent node
String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
LOG.debug("ZKProcessorLatch countDown created " + path);
}
}
代码示例来源:origin: org.apache.samza/samza-core
@Override
public void countDown() {
// create persistent node
String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
LOG.debug("ZKProcessorLatch countDown created " + path);
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
// waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
// the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
if (!targetPathExists) {
throw new TimeoutException("Timed out waiting for the targetPath");
}
}
代码示例来源:origin: org.apache.samza/samza-core
@Override
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
// waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
// the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
if (!targetPathExists) {
throw new TimeoutException("Timed out waiting for the targetPath");
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
@Override
public void countDown() {
// create persistent node
String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
LOG.debug("ZKProcessorLatch countDown created " + path);
}
}
代码示例来源:origin: apache/samza
/**
* Expires the barrier version by marking it as TIMED_OUT
*
* @param version Version associated with the Barrier
*/
public void expire(String version) {
String barrierStatePath = keyBuilder.getBarrierStatePath(version);
State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
if (Objects.equals(barrierState, State.NEW)) {
LOG.info(String.format("Expiring the barrier version: %s. Marking the barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT));
zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
} else {
LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", version, barrierState, State.TIMED_OUT));
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Expires the barrier version by marking it as TIMED_OUT
*
* @param version Version associated with the Barrier
*/
public void expire(String version) {
String barrierStatePath = keyBuilder.getBarrierStatePath(version);
State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
if (Objects.equals(barrierState, State.NEW)) {
LOG.info(String.format("Expiring the barrier version: %s. Marking the barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT));
zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
} else {
LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", version, barrierState, State.TIMED_OUT));
}
}
代码示例来源:origin: apache/samza
/**
* Joins a shared barrier by registering under the barrier sub-tree in ZK
*
* @param version Version associated with the Barrier
* @param participantId Identifier of the participant
*/
public void join(String version, String participantId) {
LOG.info("Joining the barrier version: {} as participant: {}.", version, participantId);
String barrierStatePath = keyBuilder.getBarrierStatePath(version);
LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
// TODO: Handle ZkNodeExistsException - SAMZA-1304
zkUtils.getZkClient().createPersistent(
String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Joins a shared barrier by registering under the barrier sub-tree in ZK
*
* @param version Version associated with the Barrier
* @param participantId Identifier of the participant
*/
public void join(String version, String participantId) {
LOG.info("Joining the barrier version: {} as participant: {}.", version, participantId);
String barrierStatePath = keyBuilder.getBarrierStatePath(version);
LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
// TODO: Handle ZkNodeExistsException - SAMZA-1304
zkUtils.getZkClient().createPersistent(
String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
/**
* Joins a shared barrier by registering under the barrier sub-tree in ZK
*
* @param version Version associated with the Barrier
* @param participantId Identifier of the participant
*/
public void join(String version, String participantId) {
LOG.info("Joining the barrier version: {} as participant: {}.", version, participantId);
String barrierStatePath = keyBuilder.getBarrierStatePath(version);
LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
// TODO: Handle ZkNodeExistsException - SAMZA-1304
zkUtils.getZkClient().createPersistent(
String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
}
代码示例来源:origin: apache/samza
@Test
public void testCleanUpZkBarrierVersion() {
String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
zkUtils.getZkClient().createPersistent(root, true);
ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null);
for (int i = 200; i < 210; i++) {
barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
}
zkUtils.deleteOldBarrierVersions(5);
List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
Collections.sort(zNodeIds);
Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
zNodeIds);
}
代码示例来源:origin: apache/samza
@Test
public void testCleanUpZkJobModels() {
String root = zkUtils.getKeyBuilder().getJobModelPathPrefix();
System.out.println("root=" + root);
zkUtils.getZkClient().createPersistent(root, true);
// generate multiple version
for (int i = 101; i < 110; i++) {
zkUtils.publishJobModel(String.valueOf(i), null);
}
// clean all of the versions except 5 most recent ones
zkUtils.deleteOldJobModels(5);
Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root));
}
代码示例来源:origin: apache/samza
@After
public void testTeardown() {
testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
testZkUtils.close();
}
代码示例来源:origin: apache/samza
@After
public void testTeardown() {
testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
testZkUtils.close();
}
代码示例来源:origin: apache/samza
@Before
public void testSetup() {
testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
try {
testZkUtils = getZkUtilsWithNewClient();
} catch (Exception e) {
Assert.fail("Client connection setup failed. Aborting tests..");
}
try {
testZkUtils.getZkClient().createPersistent(KEY_BUILDER.getProcessorsPath(), true);
} catch (ZkNodeExistsException e) {
// Do nothing
}
}
// used in the callbacks
内容来源于网络,如有侵权,请联系作者删除!