org.apache.samza.zk.ZkJobCoordinator类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(118)

本文整理了Java中org.apache.samza.zk.ZkJobCoordinator类的一些代码示例,展示了ZkJobCoordinator类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkJobCoordinator类的具体详情如下:
包路径:org.apache.samza.zk.ZkJobCoordinator
类名称:ZkJobCoordinator

ZkJobCoordinator介绍

[英]JobCoordinator for stand alone processor managed via Zookeeper.
[中]通过Zookeeper管理的独立处理器的JobCoordinator。

代码示例

代码示例来源:origin: apache/samza

/**
 * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
 *
 * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
 * @return An instance of {@link ZkJobCoordinator}
 */
@Override
public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
 // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
 String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
 ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
 LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
 return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils);
}

代码示例来源:origin: apache/samza

@Override
 public void onBarrierError(String version, Throwable t) {
  LOG.error("Encountered error while attaining consensus on JobModel version " + version);
  metrics.barrierError.inc();
  stop();
 }
}

代码示例来源:origin: org.apache.samza/samza-core

/**
  * Called when the children of the given path changed.
  *
  * @param parentPath      The parent path
  * @param currentChildren The children or null if the root node (parent path) was deleted.
  * @throws Exception
  */
 @Override
 public void doHandleChildChange(String parentPath, List<String> currentChildren)
   throws Exception {
  if (currentChildren == null) {
   LOG.info("handleChildChange on path " + parentPath + " was invoked with NULL list of children");
  } else {
   LOG.info("ProcessorChangeHandler::handleChildChange - Path: {} Current Children: {} ", parentPath, currentChildren);
   onProcessorChange(currentChildren);
  }
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
 LOG.info("JobModel version " + version + " obtained consensus successfully!");
 metrics.barrierStateChange.inc();
 metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
 if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
  debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
    LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
    // read the new Model
    JobModel jobModel = getJobModel();
    // start the container with the new model
    if (coordinatorListener != null) {
     coordinatorListener.onNewJobModel(processorId, jobModel);
    }
   });
 } else {
  if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
   // no-op for non-leaders
   // for leader: make sure we do not stop - so generate a new job model
   LOG.warn("Barrier for version " + version + " timed out.");
   if (leaderElector.amILeader()) {
    LOG.info("Leader will schedule a new job model generation");
    debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
      // actual actions to do are the same as onProcessorChange
      doOnProcessorChange(new ArrayList<>());
     });
   }
  }
 }
}

代码示例来源:origin: apache/samza

@Test
 public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
  ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
  ZkClient mockZkClient = Mockito.mock(ZkClient.class);
  when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);

  ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
  when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
  when(zkUtils.getZkClient()).thenReturn(mockZkClient);
  when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));

  ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);

  ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));

  StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
  zkJobCoordinator.debounceTimer = mockDebounceTimer;
  zkJobCoordinator.streamPartitionCountMonitor = monitor;

  zkJobCoordinator.stop();

  Mockito.verify(monitor).stop();
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

public void onProcessorChange(List<String> processors) {
 if (leaderElector.amILeader()) {
  LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + processors.size());
  debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors));
 }
}

代码示例来源:origin: apache/samza

@Test
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.streamPartitionCountMonitor = monitor;
 when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
 ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
 listener.onBecomingLeader();
 Mockito.verify(monitor).start();
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

JobModel jobModel = generateNewJobModel(currentProcessorIds);

代码示例来源:origin: apache/samza

JobModel jobModel = generateNewJobModel(processorNodes);
 storeConfigInCoordinatorStream();
 hasCreatedStreams = true;

代码示例来源:origin: org.apache.samza/samza-core_2.11

@Override
public void start() {
 ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
 zkUtils.validateZkVersion();
 zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
   .getJobModelPathPrefix()});
 startMetrics();
 systemAdmins.start();
 leaderElector.tryBecomeLeader();
 zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

shutdownMetrics();

代码示例来源:origin: apache/samza

JobModel jobModel = getJobModel();

代码示例来源:origin: org.apache.samza/samza-core_2.10

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
 LOG.info("JobModel version " + version + " obtained consensus successfully!");
 metrics.barrierStateChange.inc();
 metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
 if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
  debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
    LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
    // read the new Model
    JobModel jobModel = getJobModel();
    // start the container with the new model
    if (coordinatorListener != null) {
     coordinatorListener.onNewJobModel(processorId, jobModel);
    }
   });
 } else {
  if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
   // no-op for non-leaders
   // for leader: make sure we do not stop - so generate a new job model
   LOG.warn("Barrier for version " + version + " timed out.");
   if (leaderElector.amILeader()) {
    LOG.info("Leader will schedule a new job model generation");
    debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
      // actual actions to do are the same as onProcessorChange
      doOnProcessorChange(new ArrayList<>());
     });
   }
  }
 }
}

代码示例来源:origin: apache/samza

@Test
public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 CountDownLatch jcShutdownLatch = new CountDownLatch(1);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 doAnswer(new Answer<Void>() {
  public Void answer(InvocationOnMock invocation) {
   jcShutdownLatch.countDown();
   return null;
  }
 }).when(zkJobCoordinator).stop();
 final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
 zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
 verify(zkJobCoordinator, Mockito.atMost(1)).stop();
 assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
}

代码示例来源:origin: org.apache.samza/samza-core

public void onProcessorChange(List<String> processors) {
 if (leaderElector.amILeader()) {
  LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + processors.size());
  debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors));
 }
}

代码示例来源:origin: org.apache.samza/samza-core

JobModel jobModel = generateNewJobModel(currentProcessorIds);

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
public void start() {
 ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
 zkUtils.validateZkVersion();
 zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
   .getJobModelPathPrefix()});
 startMetrics();
 systemAdmins.start();
 leaderElector.tryBecomeLeader();
 zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

shutdownMetrics();

相关文章