本文整理了Java中org.apache.samza.zk.ZkUtils.getJobModel()
方法的一些代码示例,展示了ZkUtils.getJobModel()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getJobModel()
方法的具体详情如下:
包路径:org.apache.samza.zk.ZkUtils
类名称:ZkUtils
方法名:getJobModel
[英]get the job model from ZK by version
[中]按版本从ZK获取工作模型
代码示例来源:origin: org.apache.samza/samza-core_2.11
/**
* Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
*/
@Override
public void doHandleDataChange(String dataPath, Object data) {
debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
String jobModelVersion = (String) data;
LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
newJobModel = zkUtils.getJobModel(jobModelVersion);
LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
if (!newJobModel.getContainers().containsKey(processorId)) {
LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
processorId, newJobModel);
stop();
} else {
// stop current work
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
// update ZK and wait for all the processors to get this new version
barrier.join(jobModelVersion, processorId);
}
});
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<String> processors) {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
}
cachedJobModelVersion = zkJobModelVersion;
}
/**
* Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
* to host mapping) is passed in as null when building the jobModel.
*/
JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
return new JobModel(new MapConfig(), model.getContainers());
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<String> processors) {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
}
cachedJobModelVersion = zkJobModelVersion;
}
/**
* Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
* to host mapping) is passed in as null when building the jobModel.
*/
JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
return new JobModel(new MapConfig(), model.getContainers());
}
代码示例来源:origin: apache/samza
/**
* Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
*/
@Override
public void doHandleDataChange(String dataPath, Object data) {
debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
String jobModelVersion = (String) data;
LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
newJobModel = zkUtils.getJobModel(jobModelVersion);
LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
if (!newJobModel.getContainers().containsKey(processorId)) {
LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
processorId, newJobModel);
stop();
} else {
// stop current work
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
// update ZK and wait for all the processors to get this new version
barrier.join(jobModelVersion, processorId);
}
});
}
代码示例来源:origin: apache/samza
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<ProcessorNode> processorNodes) {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
}
cachedJobModelVersion = zkJobModelVersion;
}
GrouperMetadata grouperMetadata = getGrouperMetadata(zkJobModelVersion, processorNodes);
JobModel model = JobModelManager.readJobModel(config, changeLogPartitionMap, streamMetadataCache, grouperMetadata);
return new JobModel(new MapConfig(), model.getContainers());
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
*/
@Override
public void doHandleDataChange(String dataPath, Object data) {
debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
String jobModelVersion = (String) data;
LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
newJobModel = zkUtils.getJobModel(jobModelVersion);
LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
if (!newJobModel.getContainers().containsKey(processorId)) {
LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
processorId, newJobModel);
stop();
} else {
// stop current work
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
// update ZK and wait for all the processors to get this new version
barrier.join(jobModelVersion, processorId);
}
});
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<String> processors) {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
}
cachedJobModelVersion = zkJobModelVersion;
}
/**
* Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
* to host mapping) is passed in as null when building the jobModel.
*/
JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
return new JobModel(new MapConfig(), model.getContainers());
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
/**
* Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
*/
@Override
public void doHandleDataChange(String dataPath, Object data) {
debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
String jobModelVersion = (String) data;
LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
newJobModel = zkUtils.getJobModel(jobModelVersion);
LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
if (!newJobModel.getContainers().containsKey(processorId)) {
LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
processorId, newJobModel);
stop();
} else {
// stop current work
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
// update ZK and wait for all the processors to get this new version
barrier.join(jobModelVersion, processorId);
}
});
}
代码示例来源:origin: org.apache.samza/samza-core
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<String> processors) {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
}
cachedJobModelVersion = zkJobModelVersion;
}
/**
* Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
* to host mapping) is passed in as null when building the jobModel.
*/
JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
return new JobModel(new MapConfig(), model.getContainers());
}
代码示例来源:origin: org.apache.samza/samza-core
/**
* Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
*/
@Override
public void doHandleDataChange(String dataPath, Object data) {
debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
String jobModelVersion = (String) data;
LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
newJobModel = zkUtils.getJobModel(jobModelVersion);
LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
if (!newJobModel.getContainers().containsKey(processorId)) {
LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
processorId, newJobModel);
stop();
} else {
// stop current work
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
// update ZK and wait for all the processors to get this new version
barrier.join(jobModelVersion, processorId);
}
});
}
代码示例来源:origin: apache/samza
@Test
public void testPublishNewJobModel() {
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
String root = keyBuilder.getRootPath();
zkClient.deleteRecursive(root);
String version = "1";
String oldVersion = "0";
zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
zkUtils.publishJobModelVersion(oldVersion, version);
Assert.assertEquals(version, zkUtils.getJobModelVersion());
String newerVersion = Long.toString(Long.valueOf(version) + 1);
zkUtils.publishJobModelVersion(version, newerVersion);
Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
try {
zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
Assert.fail("publish invalid version should've failed");
} catch (SamzaException e) {
// expected
}
// create job model
Map<String, String> configMap = new HashMap<>();
Map<String, ContainerModel> containers = new HashMap<>();
MapConfig config = new MapConfig(configMap);
JobModel jobModel = new JobModel(config, containers);
zkUtils.publishJobModel(version, jobModel);
Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
}
代码示例来源:origin: apache/samza
/**
* Builds the {@link GrouperMetadataImpl} based upon provided {@param jobModelVersion}
* and {@param processorNodes}.
* @param jobModelVersion the most recent jobModelVersion available in the zookeeper.
* @param processorNodes the list of live processors in the zookeeper.
* @return the built grouper metadata.
*/
private GrouperMetadataImpl getGrouperMetadata(String jobModelVersion, List<ProcessorNode> processorNodes) {
Map<TaskName, String> taskToProcessorId = new HashMap<>();
Map<TaskName, List<SystemStreamPartition>> taskToSSPs = new HashMap<>();
if (jobModelVersion != null) {
JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
for (TaskModel taskModel : containerModel.getTasks().values()) {
taskToProcessorId.put(taskModel.getTaskName(), containerModel.getId());
for (SystemStreamPartition partition : taskModel.getSystemStreamPartitions()) {
taskToSSPs.computeIfAbsent(taskModel.getTaskName(), k -> new ArrayList<>());
taskToSSPs.get(taskModel.getTaskName()).add(partition);
}
}
}
}
Map<String, LocationId> processorLocality = new HashMap<>();
for (ProcessorNode processorNode : processorNodes) {
ProcessorData processorData = processorNode.getProcessorData();
processorLocality.put(processorData.getProcessorId(), processorData.getLocationId());
}
Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality();
return new GrouperMetadataImpl(processorLocality, taskLocality, taskToSSPs, taskToProcessorId);
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
zkJobCoordinator.stop();
Mockito.verify(monitor).stop();
}
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
listener.onBecomingLeader();
Mockito.verify(monitor).start();
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
Mockito.verify(monitor).stop();
}
代码示例来源:origin: apache/samza
@Test
public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
CountDownLatch jcShutdownLatch = new CountDownLatch(1);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
jcShutdownLatch.countDown();
return null;
}
}).when(zkJobCoordinator).stop();
final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
verify(zkJobCoordinator, Mockito.atMost(1)).stop();
assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
}
代码示例来源:origin: apache/samza
@Test
public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
verify(zkUtils).incGeneration();
verify(mockDebounceTimer).cancelAllScheduledActions();
verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
}
代码示例来源:origin: apache/samza
@Test
public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.AuthFailed);
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
zkSessionStateChangedListener.handleSessionEstablishmentError(new SamzaException("Test exception"));
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionDisconnects.getCount());
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSyncConnected.getCount());
Assert.assertEquals(2, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
}
内容来源于网络,如有侵权,请联系作者删除!