org.apache.samza.zk.ZkKeyBuilder类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(147)

本文整理了Java中org.apache.samza.zk.ZkKeyBuilder类的一些代码示例,展示了ZkKeyBuilder类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkKeyBuilder类的具体详情如下:
包路径:org.apache.samza.zk.ZkKeyBuilder
类名称:ZkKeyBuilder

ZkKeyBuilder介绍

[英]The following ZK hierarchy is maintained for Standalone jobs:

- / 
|- groupId/ 
|- JobModelGeneration/ 
|- jobModelVersion (data contains the version) 
|- jobModelUpgradeBarrier/ (contains barrier related data) 
|- jobModels/ 
|- 1 (contains job model version 1 as data) 
|- 2 
|- processors/ 
|- 00000001 
|- 00000002 
|- ...

Note: ZK Node levels without an ending forward slash ('/') represent a leaf node and non-leaf node, otherwise. This class provides helper methods to easily generate/parse the path in the ZK hierarchy.
[中]以下ZK层次结构是为独立作业维护的:

- / 
|- groupId/ 
|- JobModelGeneration/ 
|- jobModelVersion (data contains the version) 
|- jobModelUpgradeBarrier/ (contains barrier related data) 
|- jobModels/ 
|- 1 (contains job model version 1 as data) 
|- 2 
|- processors/ 
|- 00000001 
|- 00000002 
|- ...

注意:没有结束正斜杠(“/”)的ZK节点级别表示叶节点和非叶节点,否则表示叶节点。这个类提供了助手方法,可以轻松地生成/解析ZK层次结构中的路径。

代码示例

代码示例来源:origin: apache/samza

/**
 * get the job model from ZK by version
 * @param jobModelVersion jobModel version to get
 * @return job model for this version
 */
public JobModel getJobModel(String jobModelVersion) {
 LOG.info("Read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
 Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
 metrics.reads.inc();
 ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
 JobModel jm;
 try {
  jm = mmapper.readValue((String) data, JobModel.class);
 } catch (IOException e) {
  throw new SamzaException("failed to read JobModel from ZK", e);
 }
 return jm;
}

代码示例来源:origin: apache/samza

@Override
public void start() {
 ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
 zkUtils.validateZkVersion();
 zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()});
 systemAdmins.start();
 leaderElector.tryBecomeLeader();
 zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}

代码示例来源:origin: apache/samza

void deleteOldBarrierVersions(int numVersionsToLeave) {
 // read current list of barriers
 String path = keyBuilder.getJobModelVersionBarrierPrefix();
 LOG.info("About to delete old barrier paths from " + path);
 List<String> znodeIds = zkClient.getChildren(path);
 LOG.info("List of all zkNodes: " + znodeIds);
 deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
  @Override
  public int compare(String o1, String o2) {
   // barrier's name format is barrier_<num>
   return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
  }
 });
}

代码示例来源:origin: apache/samza

@Test
 public void testJobModelPath() {
  ZkKeyBuilder builder = new ZkKeyBuilder("test");

  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModelVersion", builder.getJobModelVersionPath());
  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
  String version = "2";
  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER_PATH + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
public void start() {
 ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
 zkUtils.validateZkVersion();
 zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
   .getJobModelPathPrefix()});
 startMetrics();
 systemAdmins.start();
 leaderElector.tryBecomeLeader();
 zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}

代码示例来源:origin: apache/samza

@Test
public void testgetNextJobModelVersion() {
 // Set up the Zk base paths for testing.
 ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
 String root = keyBuilder.getRootPath();
 zkClient.deleteRecursive(root);
 zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
 String version = "1";
 String oldVersion = "0";
 // Set zkNode JobModelVersion to 1.
 zkUtils.publishJobModelVersion(oldVersion, version);
 Assert.assertEquals(version, zkUtils.getJobModelVersion());
 // Publish JobModel with a higher version (2).
 zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>()));
 // Get on the JobModel version should return 2, taking into account the published version 2.
 Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
}

代码示例来源:origin: apache/samza

int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
 if (currentSubscription != null) {
  LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
  zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
    previousProcessorChangeListener);
  previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
 zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
   previousProcessorChangeListener);
boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
if (predecessorExists) {
 LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));

代码示例来源:origin: apache/samza

String getTaskLocalityPath() {
  return String.format("%s/%s", getRootPath(), TASK_LOCALITY_PATH);
 }
}

代码示例来源:origin: apache/samza

@Test(expected = IllegalArgumentException.class)
public void pathPrefixCannotBeEmpty() {
 new ZkKeyBuilder("    ");
}

代码示例来源:origin: apache/samza

/**
 * subscribe for changes of JobModel version
 * @param dataListener describe this
 */
public void subscribeToJobModelVersionChange(GenerationAwareZkDataListener dataListener) {
 LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
 zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
 metrics.subscriptions.inc();
}

代码示例来源:origin: apache/samza

/**
 * subscribe to the changes in the list of processors in ZK
 * @param listener - will be called when a processor is added or removed.
 */
public void subscribeToProcessorChange(IZkChildListener listener) {
 LOG.info("Subscribing for child change at:" + keyBuilder.getProcessorsPath());
 zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
 metrics.subscriptions.inc();
}

代码示例来源:origin: apache/samza

String getJobModelPath(String jobModelVersion) {
 return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
}

代码示例来源:origin: apache/samza

@Test
public void testParseIdFromPath() {
 Assert.assertEquals(
   "1",
   ZkKeyBuilder.parseIdFromPath("/test/processors/" + "1"));
 Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
 Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
}

代码示例来源:origin: apache/samza

@Test
public void testProcessorsPath() {
 ZkKeyBuilder builder = new ZkKeyBuilder("test");
 Assert.assertEquals("/test/" + ZkKeyBuilder.PROCESSORS_PATH, builder.getProcessorsPath());
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

@Override
public void start() {
 ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
 zkUtils.validateZkVersion();
 zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
   .getJobModelPathPrefix()});
 startMetrics();
 systemAdmins.start();
 leaderElector.tryBecomeLeader();
 zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
 if (currentSubscription != null) {
  LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
  zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
    previousProcessorChangeListener);
  previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
 zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
   previousProcessorChangeListener);
boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
if (predecessorExists) {
 LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));

代码示例来源:origin: apache/samza

String getJobModelPathPrefix() {
 return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH);
}

代码示例来源:origin: apache/samza

@Test(expected = IllegalArgumentException.class)
public void pathPrefixCannotBeNull() {
 new ZkKeyBuilder(null);
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * subscribe for changes of JobModel version
 * @param dataListener describe this
 */
public void subscribeToJobModelVersionChange(GenerationAwareZkDataListener dataListener) {
 LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
 zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
 metrics.subscriptions.inc();
}

代码示例来源:origin: apache/samza

/**
 * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
 *
 * @return List of absolute ZK node paths
 */
public List<String> getSortedActiveProcessorsZnodes() {
 List<String> znodeIds = zkClient.getChildren(keyBuilder.getProcessorsPath());
 if (znodeIds.size() > 0) {
  Collections.sort(znodeIds);
  LOG.info("Found these children - " + znodeIds);
 }
 return znodeIds;
}

相关文章