本文整理了Java中org.apache.samza.zk.ZkJobCoordinator.<init>()
方法的一些代码示例,展示了ZkJobCoordinator.<init>()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkJobCoordinator.<init>()
方法的具体详情如下:
包路径:org.apache.samza.zk.ZkJobCoordinator
类名称:ZkJobCoordinator
方法名:<init>
暂无
代码示例来源:origin: apache/samza
/**
* Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
*
* @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
* @return An instance of {@link ZkJobCoordinator}
*/
@Override
public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
// TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils);
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
/**
* Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
*
* @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
* @return An instance of {@link ZkJobCoordinator}
*/
@Override
public JobCoordinator getJobCoordinator(Config config) {
// TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
MetricsRegistry metricsRegistry = new MetricsRegistryMap();
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}
代码示例来源:origin: org.apache.samza/samza-core
/**
* Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
*
* @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
* @return An instance of {@link ZkJobCoordinator}
*/
@Override
public JobCoordinator getJobCoordinator(Config config) {
// TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
MetricsRegistry metricsRegistry = new MetricsRegistryMap();
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
*
* @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
* @return An instance of {@link ZkJobCoordinator}
*/
@Override
public JobCoordinator getJobCoordinator(Config config) {
// TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
MetricsRegistry metricsRegistry = new MetricsRegistryMap();
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
/**
* Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
*
* @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
* @return An instance of {@link ZkJobCoordinator}
*/
@Override
public JobCoordinator getJobCoordinator(Config config) {
// TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
MetricsRegistry metricsRegistry = new MetricsRegistryMap();
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
listener.onBecomingLeader();
Mockito.verify(monitor).start();
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
zkJobCoordinator.stop();
Mockito.verify(monitor).stop();
}
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
Mockito.verify(monitor).stop();
}
代码示例来源:origin: apache/samza
@Test
public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
CountDownLatch jcShutdownLatch = new CountDownLatch(1);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
jcShutdownLatch.countDown();
return null;
}
}).when(zkJobCoordinator).stop();
final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
verify(zkJobCoordinator, Mockito.atMost(1)).stop();
assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
}
代码示例来源:origin: apache/samza
@Test
public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
verify(zkUtils).incGeneration();
verify(mockDebounceTimer).cancelAllScheduledActions();
verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
}
代码示例来源:origin: apache/samza
@Test
public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.AuthFailed);
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
zkSessionStateChangedListener.handleSessionEstablishmentError(new SamzaException("Test exception"));
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionDisconnects.getCount());
Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSyncConnected.getCount());
Assert.assertEquals(2, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
}
内容来源于网络,如有侵权,请联系作者删除!