本文整理了Java中org.apache.samza.zk.ZkKeyBuilder
类的一些代码示例,展示了ZkKeyBuilder
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkKeyBuilder
类的具体详情如下:
包路径:org.apache.samza.zk.ZkKeyBuilder
类名称:ZkKeyBuilder
[英]The following ZK hierarchy is maintained for Standalone jobs:
- /
|- groupId/
|- JobModelGeneration/
|- jobModelVersion (data contains the version)
|- jobModelUpgradeBarrier/ (contains barrier related data)
|- jobModels/
|- 1 (contains job model version 1 as data)
|- 2
|- processors/
|- 00000001
|- 00000002
|- ...
Note: ZK Node levels without an ending forward slash ('/') represent a leaf node and non-leaf node, otherwise. This class provides helper methods to easily generate/parse the path in the ZK hierarchy.
[中]以下ZK层次结构是为独立作业维护的:
- /
|- groupId/
|- JobModelGeneration/
|- jobModelVersion (data contains the version)
|- jobModelUpgradeBarrier/ (contains barrier related data)
|- jobModels/
|- 1 (contains job model version 1 as data)
|- 2
|- processors/
|- 00000001
|- 00000002
|- ...
注意:没有结束正斜杠(“/”)的ZK节点级别表示叶节点和非叶节点,否则表示叶节点。这个类提供了助手方法,可以轻松地生成/解析ZK层次结构中的路径。
代码示例来源:origin: apache/samza
/**
* get the job model from ZK by version
* @param jobModelVersion jobModel version to get
* @return job model for this version
*/
public JobModel getJobModel(String jobModelVersion) {
LOG.info("Read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
metrics.reads.inc();
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
JobModel jm;
try {
jm = mmapper.readValue((String) data, JobModel.class);
} catch (IOException e) {
throw new SamzaException("failed to read JobModel from ZK", e);
}
return jm;
}
代码示例来源:origin: apache/samza
@Override
public void start() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.validateZkVersion();
zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()});
systemAdmins.start();
leaderElector.tryBecomeLeader();
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}
代码示例来源:origin: apache/samza
void deleteOldBarrierVersions(int numVersionsToLeave) {
// read current list of barriers
String path = keyBuilder.getJobModelVersionBarrierPrefix();
LOG.info("About to delete old barrier paths from " + path);
List<String> znodeIds = zkClient.getChildren(path);
LOG.info("List of all zkNodes: " + znodeIds);
deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// barrier's name format is barrier_<num>
return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
}
});
}
代码示例来源:origin: apache/samza
@Test
public void testJobModelPath() {
ZkKeyBuilder builder = new ZkKeyBuilder("test");
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModelVersion", builder.getJobModelVersionPath());
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
String version = "2";
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER_PATH + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
@Override
public void start() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.validateZkVersion();
zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
.getJobModelPathPrefix()});
startMetrics();
systemAdmins.start();
leaderElector.tryBecomeLeader();
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}
代码示例来源:origin: apache/samza
@Test
public void testgetNextJobModelVersion() {
// Set up the Zk base paths for testing.
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
String root = keyBuilder.getRootPath();
zkClient.deleteRecursive(root);
zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
String version = "1";
String oldVersion = "0";
// Set zkNode JobModelVersion to 1.
zkUtils.publishJobModelVersion(oldVersion, version);
Assert.assertEquals(version, zkUtils.getJobModelVersion());
// Publish JobModel with a higher version (2).
zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>()));
// Get on the JobModel version should return 2, taking into account the published version 2.
Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
}
代码示例来源:origin: apache/samza
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
if (currentSubscription != null) {
LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
if (predecessorExists) {
LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
代码示例来源:origin: apache/samza
String getTaskLocalityPath() {
return String.format("%s/%s", getRootPath(), TASK_LOCALITY_PATH);
}
}
代码示例来源:origin: apache/samza
@Test(expected = IllegalArgumentException.class)
public void pathPrefixCannotBeEmpty() {
new ZkKeyBuilder(" ");
}
代码示例来源:origin: apache/samza
/**
* subscribe for changes of JobModel version
* @param dataListener describe this
*/
public void subscribeToJobModelVersionChange(GenerationAwareZkDataListener dataListener) {
LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
metrics.subscriptions.inc();
}
代码示例来源:origin: apache/samza
/**
* subscribe to the changes in the list of processors in ZK
* @param listener - will be called when a processor is added or removed.
*/
public void subscribeToProcessorChange(IZkChildListener listener) {
LOG.info("Subscribing for child change at:" + keyBuilder.getProcessorsPath());
zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
metrics.subscriptions.inc();
}
代码示例来源:origin: apache/samza
String getJobModelPath(String jobModelVersion) {
return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
}
代码示例来源:origin: apache/samza
@Test
public void testParseIdFromPath() {
Assert.assertEquals(
"1",
ZkKeyBuilder.parseIdFromPath("/test/processors/" + "1"));
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
}
代码示例来源:origin: apache/samza
@Test
public void testProcessorsPath() {
ZkKeyBuilder builder = new ZkKeyBuilder("test");
Assert.assertEquals("/test/" + ZkKeyBuilder.PROCESSORS_PATH, builder.getProcessorsPath());
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
@Override
public void start() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.validateZkVersion();
zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
.getJobModelPathPrefix()});
startMetrics();
systemAdmins.start();
leaderElector.tryBecomeLeader();
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
if (currentSubscription != null) {
LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
if (predecessorExists) {
LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
代码示例来源:origin: apache/samza
String getJobModelPathPrefix() {
return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH);
}
代码示例来源:origin: apache/samza
@Test(expected = IllegalArgumentException.class)
public void pathPrefixCannotBeNull() {
new ZkKeyBuilder(null);
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* subscribe for changes of JobModel version
* @param dataListener describe this
*/
public void subscribeToJobModelVersionChange(GenerationAwareZkDataListener dataListener) {
LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
metrics.subscriptions.inc();
}
代码示例来源:origin: apache/samza
/**
* Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
*
* @return List of absolute ZK node paths
*/
public List<String> getSortedActiveProcessorsZnodes() {
List<String> znodeIds = zkClient.getChildren(keyBuilder.getProcessorsPath());
if (znodeIds.size() > 0) {
Collections.sort(znodeIds);
LOG.info("Found these children - " + znodeIds);
}
return znodeIds;
}
内容来源于网络,如有侵权,请联系作者删除!