org.apache.samza.zk.ZkJobCoordinator.<init>()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(104)

本文整理了Java中org.apache.samza.zk.ZkJobCoordinator.<init>()方法的一些代码示例,展示了ZkJobCoordinator.<init>()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkJobCoordinator.<init>()方法的具体详情如下:
包路径:org.apache.samza.zk.ZkJobCoordinator
类名称:ZkJobCoordinator
方法名:<init>

ZkJobCoordinator.<init>介绍

暂无

代码示例

代码示例来源:origin: apache/samza

/**
 * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
 *
 * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
 * @return An instance of {@link ZkJobCoordinator}
 */
@Override
public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
 // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
 String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
 ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
 LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
 return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils);
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

/**
 * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
 *
 * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
 * @return An instance of {@link ZkJobCoordinator}
 */
@Override
public JobCoordinator getJobCoordinator(Config config) {
 // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
 MetricsRegistry metricsRegistry = new MetricsRegistryMap();
 String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
 ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
 LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
 return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}

代码示例来源:origin: org.apache.samza/samza-core

/**
 * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
 *
 * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
 * @return An instance of {@link ZkJobCoordinator}
 */
@Override
public JobCoordinator getJobCoordinator(Config config) {
 // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
 MetricsRegistry metricsRegistry = new MetricsRegistryMap();
 String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
 ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
 LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
 return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
 *
 * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
 * @return An instance of {@link ZkJobCoordinator}
 */
@Override
public JobCoordinator getJobCoordinator(Config config) {
 // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
 MetricsRegistry metricsRegistry = new MetricsRegistryMap();
 String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
 ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
 LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
 return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
 *
 * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
 * @return An instance of {@link ZkJobCoordinator}
 */
@Override
public JobCoordinator getJobCoordinator(Config config) {
 // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
 MetricsRegistry metricsRegistry = new MetricsRegistryMap();
 String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
 ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
 LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
 return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}

代码示例来源:origin: apache/samza

@Test
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.streamPartitionCountMonitor = monitor;
 when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
 ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
 listener.onBecomingLeader();
 Mockito.verify(monitor).start();
}

代码示例来源:origin: apache/samza

@Test
 public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
  ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
  ZkClient mockZkClient = Mockito.mock(ZkClient.class);
  when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);

  ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
  when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
  when(zkUtils.getZkClient()).thenReturn(mockZkClient);
  when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));

  ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);

  ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));

  StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
  zkJobCoordinator.debounceTimer = mockDebounceTimer;
  zkJobCoordinator.streamPartitionCountMonitor = monitor;

  zkJobCoordinator.stop();

  Mockito.verify(monitor).stop();
 }
}

代码示例来源:origin: apache/samza

@Test
public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.streamPartitionCountMonitor = monitor;
 ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
 Mockito.verify(monitor).stop();
}

代码示例来源:origin: apache/samza

@Test
public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 CountDownLatch jcShutdownLatch = new CountDownLatch(1);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 doAnswer(new Answer<Void>() {
  public Void answer(InvocationOnMock invocation) {
   jcShutdownLatch.countDown();
   return null;
  }
 }).when(zkJobCoordinator).stop();
 final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
 zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
 verify(zkJobCoordinator, Mockito.atMost(1)).stop();
 assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
}

代码示例来源:origin: apache/samza

@Test
public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
 final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
 verify(zkUtils).incGeneration();
 verify(mockDebounceTimer).cancelAllScheduledActions();
 verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
}

代码示例来源:origin: apache/samza

@Test
public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
 final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.AuthFailed);
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
 zkSessionStateChangedListener.handleSessionEstablishmentError(new SamzaException("Test exception"));
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionDisconnects.getCount());
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSyncConnected.getCount());
 Assert.assertEquals(2, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
}

相关文章