本文整理了Java中org.apache.samza.zk.ZkJobCoordinator
类的一些代码示例,展示了ZkJobCoordinator
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkJobCoordinator
类的具体详情如下:
包路径:org.apache.samza.zk.ZkJobCoordinator
类名称:ZkJobCoordinator
[英]JobCoordinator for stand alone processor managed via Zookeeper.
[中]通过Zookeeper管理的独立处理器的JobCoordinator。
代码示例来源:origin: apache/samza
/**
* Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
*
* @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
* @return An instance of {@link ZkJobCoordinator}
*/
@Override
public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
// TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils);
}
代码示例来源:origin: apache/samza
@Override
public void onBarrierError(String version, Throwable t) {
LOG.error("Encountered error while attaining consensus on JobModel version " + version);
metrics.barrierError.inc();
stop();
}
}
代码示例来源:origin: org.apache.samza/samza-core
/**
* Called when the children of the given path changed.
*
* @param parentPath The parent path
* @param currentChildren The children or null if the root node (parent path) was deleted.
* @throws Exception
*/
@Override
public void doHandleChildChange(String parentPath, List<String> currentChildren)
throws Exception {
if (currentChildren == null) {
LOG.info("handleChildChange on path " + parentPath + " was invoked with NULL list of children");
} else {
LOG.info("ProcessorChangeHandler::handleChildChange - Path: {} Current Children: {} ", parentPath, currentChildren);
onProcessorChange(currentChildren);
}
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
this.config = config;
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
this.processorId = createProcessorId(config);
this.zkUtils = zkUtils;
// setup a listener for a session state change
// we are mostly interested in "session closed" and "new session created" events
zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
leaderElector = new ZkLeaderElector(processorId, zkUtils);
leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
debounceTimer = new ScheduleAfterDebounceTime(processorId);
debounceTimer.setScheduledTaskCallback(throwable -> {
LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
stop();
});
this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
systemAdmins = new SystemAdmins(config);
streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
LOG.info("JobModel version " + version + " obtained consensus successfully!");
metrics.barrierStateChange.inc();
metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
// read the new Model
JobModel jobModel = getJobModel();
// start the container with the new model
if (coordinatorListener != null) {
coordinatorListener.onNewJobModel(processorId, jobModel);
}
});
} else {
if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
// no-op for non-leaders
// for leader: make sure we do not stop - so generate a new job model
LOG.warn("Barrier for version " + version + " timed out.");
if (leaderElector.amILeader()) {
LOG.info("Leader will schedule a new job model generation");
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
// actual actions to do are the same as onProcessorChange
doOnProcessorChange(new ArrayList<>());
});
}
}
}
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
zkJobCoordinator.stop();
Mockito.verify(monitor).stop();
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
public void onProcessorChange(List<String> processors) {
if (leaderElector.amILeader()) {
LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + processors.size());
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors));
}
}
代码示例来源:origin: apache/samza
@Test
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
listener.onBecomingLeader();
Mockito.verify(monitor).start();
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
JobModel jobModel = generateNewJobModel(currentProcessorIds);
代码示例来源:origin: apache/samza
JobModel jobModel = generateNewJobModel(processorNodes);
storeConfigInCoordinatorStream();
hasCreatedStreams = true;
代码示例来源:origin: org.apache.samza/samza-core_2.11
@Override
public void start() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.validateZkVersion();
zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
.getJobModelPathPrefix()});
startMetrics();
systemAdmins.start();
leaderElector.tryBecomeLeader();
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
shutdownMetrics();
代码示例来源:origin: apache/samza
JobModel jobModel = getJobModel();
代码示例来源:origin: org.apache.samza/samza-core_2.10
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
this.config = config;
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
this.processorId = createProcessorId(config);
this.zkUtils = zkUtils;
// setup a listener for a session state change
// we are mostly interested in "session closed" and "new session created" events
zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
leaderElector = new ZkLeaderElector(processorId, zkUtils);
leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
debounceTimer = new ScheduleAfterDebounceTime(processorId);
debounceTimer.setScheduledTaskCallback(throwable -> {
LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
stop();
});
this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
systemAdmins = new SystemAdmins(config);
streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
LOG.info("JobModel version " + version + " obtained consensus successfully!");
metrics.barrierStateChange.inc();
metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
// read the new Model
JobModel jobModel = getJobModel();
// start the container with the new model
if (coordinatorListener != null) {
coordinatorListener.onNewJobModel(processorId, jobModel);
}
});
} else {
if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
// no-op for non-leaders
// for leader: make sure we do not stop - so generate a new job model
LOG.warn("Barrier for version " + version + " timed out.");
if (leaderElector.amILeader()) {
LOG.info("Leader will schedule a new job model generation");
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
// actual actions to do are the same as onProcessorChange
doOnProcessorChange(new ArrayList<>());
});
}
}
}
}
代码示例来源:origin: apache/samza
@Test
public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
CountDownLatch jcShutdownLatch = new CountDownLatch(1);
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
jcShutdownLatch.countDown();
return null;
}
}).when(zkJobCoordinator).stop();
final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
verify(zkJobCoordinator, Mockito.atMost(1)).stop();
assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
}
代码示例来源:origin: org.apache.samza/samza-core
public void onProcessorChange(List<String> processors) {
if (leaderElector.amILeader()) {
LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + processors.size());
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors));
}
}
代码示例来源:origin: org.apache.samza/samza-core
JobModel jobModel = generateNewJobModel(currentProcessorIds);
代码示例来源:origin: org.apache.samza/samza-core_2.12
@Override
public void start() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.validateZkVersion();
zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
.getJobModelPathPrefix()});
startMetrics();
systemAdmins.start();
leaderElector.tryBecomeLeader();
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
shutdownMetrics();
内容来源于网络,如有侵权,请联系作者删除!