org.apache.samza.zk.ZkUtils.getJobModel()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(20.9k)|赞(0)|评价(0)|浏览(138)

本文整理了Java中org.apache.samza.zk.ZkUtils.getJobModel()方法的一些代码示例,展示了ZkUtils.getJobModel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getJobModel()方法的具体详情如下:
包路径:org.apache.samza.zk.ZkUtils
类名称:ZkUtils
方法名:getJobModel

ZkUtils.getJobModel介绍

[英]get the job model from ZK by version
[中]按版本从ZK获取工作模型

代码示例

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Generate new JobModel when becoming a leader or the list of processor changed.
 */
private JobModel generateNewJobModel(List<String> processors) {
 String zkJobModelVersion = zkUtils.getJobModelVersion();
 // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
 if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
  JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
  for (ContainerModel containerModel : jobModel.getContainers().values()) {
   containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
  }
  cachedJobModelVersion = zkJobModelVersion;
 }
 /**
  * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
  * to host mapping) is passed in as null when building the jobModel.
  */
 JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
 return new JobModel(new MapConfig(), model.getContainers());
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Generate new JobModel when becoming a leader or the list of processor changed.
 */
private JobModel generateNewJobModel(List<String> processors) {
 String zkJobModelVersion = zkUtils.getJobModelVersion();
 // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
 if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
  JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
  for (ContainerModel containerModel : jobModel.getContainers().values()) {
   containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
  }
  cachedJobModelVersion = zkJobModelVersion;
 }
 /**
  * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
  * to host mapping) is passed in as null when building the jobModel.
  */
 JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
 return new JobModel(new MapConfig(), model.getContainers());
}

代码示例来源:origin: apache/samza

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: apache/samza

/**
 * Generate new JobModel when becoming a leader or the list of processor changed.
 */
private JobModel generateNewJobModel(List<ProcessorNode> processorNodes) {
 String zkJobModelVersion = zkUtils.getJobModelVersion();
 // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
 if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
  JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
  for (ContainerModel containerModel : jobModel.getContainers().values()) {
   containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
  }
  cachedJobModelVersion = zkJobModelVersion;
 }
 GrouperMetadata grouperMetadata = getGrouperMetadata(zkJobModelVersion, processorNodes);
 JobModel model = JobModelManager.readJobModel(config, changeLogPartitionMap, streamMetadataCache, grouperMetadata);
 return new JobModel(new MapConfig(), model.getContainers());
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

/**
 * Generate new JobModel when becoming a leader or the list of processor changed.
 */
private JobModel generateNewJobModel(List<String> processors) {
 String zkJobModelVersion = zkUtils.getJobModelVersion();
 // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
 if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
  JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
  for (ContainerModel containerModel : jobModel.getContainers().values()) {
   containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
  }
  cachedJobModelVersion = zkJobModelVersion;
 }
 /**
  * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
  * to host mapping) is passed in as null when building the jobModel.
  */
 JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
 return new JobModel(new MapConfig(), model.getContainers());
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: org.apache.samza/samza-core

/**
 * Generate new JobModel when becoming a leader or the list of processor changed.
 */
private JobModel generateNewJobModel(List<String> processors) {
 String zkJobModelVersion = zkUtils.getJobModelVersion();
 // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
 if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
  JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
  for (ContainerModel containerModel : jobModel.getContainers().values()) {
   containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
  }
  cachedJobModelVersion = zkJobModelVersion;
 }
 /**
  * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
  * to host mapping) is passed in as null when building the jobModel.
  */
 JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
 return new JobModel(new MapConfig(), model.getContainers());
}

代码示例来源:origin: org.apache.samza/samza-core

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: apache/samza

@Test
public void testPublishNewJobModel() {
 ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
 String root = keyBuilder.getRootPath();
 zkClient.deleteRecursive(root);
 String version = "1";
 String oldVersion = "0";
 zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
 zkUtils.publishJobModelVersion(oldVersion, version);
 Assert.assertEquals(version, zkUtils.getJobModelVersion());
 String newerVersion = Long.toString(Long.valueOf(version) + 1);
 zkUtils.publishJobModelVersion(version, newerVersion);
 Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
 try {
  zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
  Assert.fail("publish invalid version should've failed");
 } catch (SamzaException e) {
  // expected
 }
 // create job model
 Map<String, String> configMap = new HashMap<>();
 Map<String, ContainerModel> containers = new HashMap<>();
 MapConfig config = new MapConfig(configMap);
 JobModel jobModel = new JobModel(config, containers);
 zkUtils.publishJobModel(version, jobModel);
 Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
}

代码示例来源:origin: apache/samza

/**
 * Builds the {@link GrouperMetadataImpl} based upon provided {@param jobModelVersion}
 * and {@param processorNodes}.
 * @param jobModelVersion the most recent jobModelVersion available in the zookeeper.
 * @param processorNodes the list of live processors in the zookeeper.
 * @return the built grouper metadata.
 */
private GrouperMetadataImpl getGrouperMetadata(String jobModelVersion, List<ProcessorNode> processorNodes) {
 Map<TaskName, String> taskToProcessorId = new HashMap<>();
 Map<TaskName, List<SystemStreamPartition>> taskToSSPs = new HashMap<>();
 if (jobModelVersion != null) {
  JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
  for (ContainerModel containerModel : jobModel.getContainers().values()) {
   for (TaskModel taskModel : containerModel.getTasks().values()) {
    taskToProcessorId.put(taskModel.getTaskName(), containerModel.getId());
    for (SystemStreamPartition partition : taskModel.getSystemStreamPartitions()) {
     taskToSSPs.computeIfAbsent(taskModel.getTaskName(), k -> new ArrayList<>());
     taskToSSPs.get(taskModel.getTaskName()).add(partition);
    }
   }
  }
 }
 Map<String, LocationId> processorLocality = new HashMap<>();
 for (ProcessorNode processorNode : processorNodes) {
  ProcessorData processorData = processorNode.getProcessorData();
  processorLocality.put(processorData.getProcessorId(), processorData.getLocationId());
 }
 Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality();
 return new GrouperMetadataImpl(processorLocality, taskLocality, taskToSSPs, taskToProcessorId);
}

代码示例来源:origin: apache/samza

@Test
 public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
  ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
  ZkClient mockZkClient = Mockito.mock(ZkClient.class);
  when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);

  ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
  when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
  when(zkUtils.getZkClient()).thenReturn(mockZkClient);
  when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));

  ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);

  ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));

  StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
  zkJobCoordinator.debounceTimer = mockDebounceTimer;
  zkJobCoordinator.streamPartitionCountMonitor = monitor;

  zkJobCoordinator.stop();

  Mockito.verify(monitor).stop();
 }
}

代码示例来源:origin: apache/samza

@Test
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.streamPartitionCountMonitor = monitor;
 when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
 ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
 listener.onBecomingLeader();
 Mockito.verify(monitor).start();
}

代码示例来源:origin: apache/samza

@Test
public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.streamPartitionCountMonitor = monitor;
 ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
 Mockito.verify(monitor).stop();
}

代码示例来源:origin: apache/samza

@Test
public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 CountDownLatch jcShutdownLatch = new CountDownLatch(1);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 doAnswer(new Answer<Void>() {
  public Void answer(InvocationOnMock invocation) {
   jcShutdownLatch.countDown();
   return null;
  }
 }).when(zkJobCoordinator).stop();
 final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
 zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
 verify(zkJobCoordinator, Mockito.atMost(1)).stop();
 assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
}

代码示例来源:origin: apache/samza

@Test
public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
 final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
 verify(zkUtils).incGeneration();
 verify(mockDebounceTimer).cancelAllScheduledActions();
 verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
}

代码示例来源:origin: apache/samza

@Test
public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
 final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.AuthFailed);
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
 zkSessionStateChangedListener.handleSessionEstablishmentError(new SamzaException("Test exception"));
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionDisconnects.getCount());
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSyncConnected.getCount());
 Assert.assertEquals(2, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
}

相关文章