
x33g5p2x  于2022-02-05 转载在 其他  





代码示例来源:origin: apache/samza

 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);

代码示例来源:origin: apache/samza

public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");

代码示例来源:origin: org.apache.samza/samza-core_2.11

public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");

代码示例来源:origin: org.apache.samza/samza-core_2.12

public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");

代码示例来源:origin: org.apache.samza/samza-core_2.12

 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);

代码示例来源:origin: org.apache.samza/samza-core_2.11

 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);

代码示例来源:origin: org.apache.samza/samza-core

 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);

代码示例来源:origin: org.apache.samza/samza-core_2.10

public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");

代码示例来源:origin: org.apache.samza/samza-core

public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
 // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
 // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
 boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
 if (!targetPathExists) {
  throw new TimeoutException("Timed out waiting for the targetPath");

代码示例来源:origin: org.apache.samza/samza-core_2.10

 public void countDown() {
  // create persistent node
  String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
  LOG.debug("ZKProcessorLatch countDown created " + path);

代码示例来源:origin: apache/samza

 * Expires the barrier version by marking it as TIMED_OUT
 * @param version Version associated with the Barrier
public void expire(String version) {
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);
 State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
 if (Objects.equals(barrierState, State.NEW)) {"Expiring the barrier version: %s. Marking the barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT));
  zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
 } else {
  LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", version, barrierState, State.TIMED_OUT));

代码示例来源:origin: org.apache.samza/samza-core_2.12

 * Expires the barrier version by marking it as TIMED_OUT
 * @param version Version associated with the Barrier
public void expire(String version) {
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);
 State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
 if (Objects.equals(barrierState, State.NEW)) {"Expiring the barrier version: %s. Marking the barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT));
  zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
 } else {
  LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", version, barrierState, State.TIMED_OUT));

代码示例来源:origin: apache/samza

 * Joins a shared barrier by registering under the barrier sub-tree in ZK
 * @param version Version associated with the Barrier
 * @param participantId Identifier of the participant
public void join(String version, String participantId) {"Joining the barrier version: {} as participant: {}.", version, participantId);
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);"Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
 zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
 // TODO: Handle ZkNodeExistsException - SAMZA-1304
   String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));

代码示例来源:origin: org.apache.samza/samza-core_2.12

 * Joins a shared barrier by registering under the barrier sub-tree in ZK
 * @param version Version associated with the Barrier
 * @param participantId Identifier of the participant
public void join(String version, String participantId) {"Joining the barrier version: {} as participant: {}.", version, participantId);
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);"Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
 zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
 // TODO: Handle ZkNodeExistsException - SAMZA-1304
   String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));

代码示例来源:origin: org.apache.samza/samza-core_2.10

 * Joins a shared barrier by registering under the barrier sub-tree in ZK
 * @param version Version associated with the Barrier
 * @param participantId Identifier of the participant
public void join(String version, String participantId) {"Joining the barrier version: {} as participant: {}.", version, participantId);
 String barrierStatePath = keyBuilder.getBarrierStatePath(version);"Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version);
 zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
 // TODO: Handle ZkNodeExistsException - SAMZA-1304
   String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));

代码示例来源:origin: apache/samza

public void testCleanUpZkBarrierVersion() {
 String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
 zkUtils.getZkClient().createPersistent(root, true);
 ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null);
 for (int i = 200; i < 210; i++) {
  barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
 List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
 Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),

代码示例来源:origin: apache/samza

public void testCleanUpZkJobModels() {
 String root = zkUtils.getKeyBuilder().getJobModelPathPrefix();
 System.out.println("root=" + root);
 zkUtils.getZkClient().createPersistent(root, true);
 // generate multiple version
 for (int i = 101; i < 110; i++) {
  zkUtils.publishJobModel(String.valueOf(i), null);
 // clean all of the versions except 5 most recent ones
 Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root));

代码示例来源:origin: apache/samza

public void testTeardown() {

代码示例来源:origin: apache/samza

public void testTeardown() {

代码示例来源:origin: apache/samza

public void testSetup() {
 testZkConnectionString = "" + zkServer.getPort();
 try {
  testZkUtils = getZkUtilsWithNewClient();
 } catch (Exception e) {"Client connection setup failed. Aborting tests..");
 try {
  testZkUtils.getZkClient().createPersistent(KEY_BUILDER.getProcessorsPath(), true);
 } catch (ZkNodeExistsException e) {
  // Do nothing
// used in the callbacks
