本文整理了Java中org.apache.samza.zk.ZkUtils
类的一些代码示例,展示了ZkUtils
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils
类的具体详情如下:
包路径:org.apache.samza.zk.ZkUtils
类名称:ZkUtils
[英]Util class to help manage Zk connection and ZkClient. It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
Note on ZkClient: ZkClient consists of two threads - I/O thread and Event thread. I/O thread manages heartbeats to the Zookeeper server in the ensemble and handles responses to synchronous methods in Zookeeper API. Event thread typically receives all the Watcher events and delivers to registered listeners. It, also, handles responses to asynchronous methods in Zookeeper API.
Note on Session disconnect handling: After the session has timed out, and restored we may still get some notifications from before (from the old session). To avoid this, we add a currentGeneration member, which starts with 0, and is increased each time a new session is established. Current value of this member is passed to each Listener when it is created. So if the Callback from this Listener comes with an old generation id - we ignore it.
Note on Session Management: Session management, if needed, should be handled by the caller. This can be done by implementing org.I0Itec.zkclient.IZkStateListener and subscribing this listener to the current ZkClient. Note: The connection state change callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking processing in the callbacks.
[中]Util类来帮助管理Zk连接和ZkClient。它还为ZK树的读/写/订阅/取消订阅访问提供了额外的实用方法。
关于ZkClient的注意事项:ZkClient由两个线程组成——I/O线程和事件线程。I/O线程在集成中管理Zookeeper服务器的心跳,并在Zookeeper API中处理对同步方法的响应。事件线程通常接收所有观察者事件,并将其传递给注册的侦听器。它还处理Zookeeper API中异步方法的响应。
关于会话断开处理的注意事项:在会话超时并恢复后,我们可能仍然会收到以前的一些通知(来自旧会话)。为了避免这种情况,我们添加了一个currentGeneration成员,它从0开始,每次建立新会话时都会增加。此成员的当前值在创建时传递给每个侦听器。因此,如果这个侦听器的回调带有旧一代id,我们将忽略它。
会话管理注意事项:如果需要,会话管理应由调用者处理。这可以通过实现org来实现。I0Itec。我的客户。IZkStateListener,并将此侦听器订阅到当前客户端。注意:连接状态更改回调是在ZkClient的事件线程上下文中调用的。因此,建议在回调中进行非阻塞处理。
代码示例来源:origin: apache/samza
public void close() {
try {
if (zkUtils != null)
zkUtils.close();
} catch (ZkInterruptedException ex) {
// Swallowing due to occurrence in the last stage of lifecycle(Not actionable).
LOG.error("Exception in close(): ", ex);
}
}
代码示例来源:origin: apache/samza
/**
* cleanup old data from ZK
* @param numVersionsToLeave - number of versions to leave
*/
public void cleanupZK(int numVersionsToLeave) {
deleteOldBarrierVersions(numVersionsToLeave);
deleteOldJobModels(numVersionsToLeave);
}
代码示例来源:origin: apache/samza
void deleteOldBarrierVersions(int numVersionsToLeave) {
// read current list of barriers
String path = keyBuilder.getJobModelVersionBarrierPrefix();
LOG.info("About to delete old barrier paths from " + path);
List<String> znodeIds = zkClient.getChildren(path);
LOG.info("List of all zkNodes: " + znodeIds);
deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// barrier's name format is barrier_<num>
return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
}
});
}
代码示例来源:origin: apache/samza
/**
* Method is used to get the list of currently active/registered processor ids
* @return List of processorIds
*/
public List<String> getSortedActiveProcessorsIDs() {
return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}
代码示例来源:origin: org.apache.samza/samza-core
void doOnProcessorChange(List<String> processors) {
List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs();
Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds);
String currentJMVersion = zkUtils.getJobModelVersion();
String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion);
LOG.info("pid=" + processorId + "Generated new JobModel with version: " + nextJMVersion + " and processors: " + currentProcessorIds);
zkUtils.publishJobModel(nextJMVersion, jobModel);
zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
代码示例来源:origin: apache/samza
String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
if (currentSubscription != null) {
LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
if (predecessorExists) {
LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
代码示例来源:origin: apache/samza
void doOnProcessorChange() {
List<ProcessorNode> processorNodes = zkUtils.getAllProcessorNodes();
String currentJMVersion = zkUtils.getJobModelVersion();
String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion);
LOG.info("pid=" + processorId + "Generated new JobModel with version: " + nextJMVersion + " and processors: " + currentProcessorIds);
zkUtils.publishJobModel(nextJMVersion, jobModel);
zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
代码示例来源:origin: apache/samza
@Test
public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
zkJobCoordinator.stop();
Mockito.verify(monitor).stop();
}
}
代码示例来源:origin: apache/samza
@Test
public void testgetNextJobModelVersion() {
// Set up the Zk base paths for testing.
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
String root = keyBuilder.getRootPath();
zkClient.deleteRecursive(root);
zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
String version = "1";
String oldVersion = "0";
// Set zkNode JobModelVersion to 1.
zkUtils.publishJobModelVersion(oldVersion, version);
Assert.assertEquals(version, zkUtils.getJobModelVersion());
// Publish JobModel with a higher version (2).
zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>()));
// Get on the JobModel version should return 2, taking into account the published version 2.
Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
}
代码示例来源:origin: apache/samza
@Test
public void testPublishNewJobModel() {
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
String root = keyBuilder.getRootPath();
zkClient.deleteRecursive(root);
String version = "1";
String oldVersion = "0";
zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
zkUtils.publishJobModelVersion(oldVersion, version);
Assert.assertEquals(version, zkUtils.getJobModelVersion());
String newerVersion = Long.toString(Long.valueOf(version) + 1);
zkUtils.publishJobModelVersion(version, newerVersion);
Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
try {
zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
Assert.fail("publish invalid version should've failed");
} catch (SamzaException e) {
// expected
}
// create job model
Map<String, String> configMap = new HashMap<>();
Map<String, ContainerModel> containers = new HashMap<>();
MapConfig config = new MapConfig(configMap);
JobModel jobModel = new JobModel(config, containers);
zkUtils.publishJobModel(version, jobModel);
Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
}
代码示例来源:origin: apache/samza
@Test
public void testCleanUpZkJobModels() {
String root = zkUtils.getKeyBuilder().getJobModelPathPrefix();
System.out.println("root=" + root);
zkUtils.getZkClient().createPersistent(root, true);
// generate multiple version
for (int i = 101; i < 110; i++) {
zkUtils.publishJobModel(String.valueOf(i), null);
}
// clean all of the versions except 5 most recent ones
zkUtils.deleteOldJobModels(5);
Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root));
}
代码示例来源:origin: apache/samza
@Test
public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
verify(zkUtils).incGeneration();
verify(mockDebounceTimer).cancelAllScheduledActions();
verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<String> processors) {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
}
cachedJobModelVersion = zkJobModelVersion;
}
/**
* Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
* to host mapping) is passed in as null when building the jobModel.
*/
JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
return new JobModel(new MapConfig(), model.getContainers());
}
代码示例来源:origin: apache/samza
zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1"));
ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, new IZkDataListener() {
@Override
zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2"));
ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, new IZkDataListener() {
@Override
final String path3 = zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3"));
ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, new IZkDataListener() {
@Override
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessorsZnodes();
Assert.assertEquals(3, currentActiveProcessors.size());
zkUtils2.close();
currentActiveProcessors.remove(1);
Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessorsZnodes());
zkUtils1.close();
zkUtils3.close();
代码示例来源:origin: apache/samza
@Test
public void testCleanUpZkBarrierVersion() {
String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
zkUtils.getZkClient().createPersistent(root, true);
ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null);
for (int i = 200; i < 210; i++) {
barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
}
zkUtils.deleteOldBarrierVersions(5);
List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
Collections.sort(zNodeIds);
Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
zNodeIds);
}
代码示例来源:origin: apache/samza
private ZkUtils getZkUtils() {
return new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS,
SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
}
代码示例来源:origin: apache/samza
@Test
public void testGetProcessorsIDs() {
Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
List<String> l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(1, l.size());
new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(
new ProcessorData("host2", "2"));
l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(2, l.size());
Assert.assertEquals(" ID1 didn't match", "1", l.get(0));
Assert.assertEquals(" ID2 didn't match", "2", l.get(1));
}
代码示例来源:origin: apache/samza
@Test
public void testCloseShouldRetryOnceOnInterruptedException() {
ZkClient zkClient = Mockito.mock(ZkClient.class);
ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
Mockito.doThrow(new ZkInterruptedException(new InterruptedException()))
.doAnswer(invocation -> null)
.when(zkClient).close();
zkUtils.close();
Mockito.verify(zkClient, Mockito.times(2)).close();
}
代码示例来源:origin: apache/samza
@After
public void testTeardown() {
testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
testZkUtils.close();
}
代码示例来源:origin: apache/samza
String participant1 = "participant1";
ZkUtils zkUtils = getZkUtilsWithNewClient(participant1);
zkUtils.connect();
Latch latch = new ZkProcessorLatch(latchSize, latchId, participant1, zkUtils);
Assert.fail(String.format("await timed out from %s - %s", participant1, e.getLocalizedMessage()));
} finally {
zkUtils.close();
testZkUtils.getZkClient().getChildren(
String.format("%s/%s_%s", KEY_BUILDER.getRootPath(), ZkProcessorLatch.LATCH_PATH, latchId));
Assert.assertNotNull(latchParticipants);
内容来源于网络,如有侵权,请联系作者删除!