org.apache.samza.zk.ZkJobCoordinator.stop()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(109)

本文整理了Java中org.apache.samza.zk.ZkJobCoordinator.stop()方法的一些代码示例,展示了ZkJobCoordinator.stop()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkJobCoordinator.stop()方法的具体详情如下:
包路径:org.apache.samza.zk.ZkJobCoordinator
类名称:ZkJobCoordinator
方法名:stop

ZkJobCoordinator.stop介绍

暂无

代码示例

代码示例来源:origin: apache/samza

@Override
 public void onBarrierError(String version, Throwable t) {
  LOG.error("Encountered error while attaining consensus on JobModel version " + version);
  metrics.barrierError.inc();
  stop();
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
 public void onBarrierError(String version, Throwable t) {
  LOG.error("Encountered error while attaining consensus on JobModel version " + version);
  metrics.barrierError.inc();
  stop();
 }
}

代码示例来源:origin: apache/samza

@Override
 public void doHandleDataDeleted(String dataPath) {
  LOG.warn("JobModel version z-node has been deleted. Shutting down the coordinator" + dataPath);
  debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0,  () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

@Override
 public void onBarrierError(String version, Throwable t) {
  LOG.error("Encountered error while attaining consensus on JobModel version " + version);
  metrics.barrierError.inc();
  stop();
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
 public void doHandleDataDeleted(String dataPath) {
  LOG.warn("JobModel version z-node has been deleted. Shutting down the coordinator" + dataPath);
  debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0,  () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

@Override
 public void handleSessionEstablishmentError(Throwable error) {
  // this means we cannot connect to zookeeper to establish a session
  LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
  debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core

@Override
 public void handleSessionEstablishmentError(Throwable error) {
  // this means we cannot connect to zookeeper to establish a session
  LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
  debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

@Override
 public void doHandleDataDeleted(String dataPath) {
  LOG.warn("JobModel version z-node has been deleted. Shutting down the coordinator" + dataPath);
  debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0,  () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core

@Override
 public void doHandleDataDeleted(String dataPath) {
  LOG.warn("JobModel version z-node has been deleted. Shutting down the coordinator" + dataPath);
  debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0,  () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

@Override
 public void handleSessionEstablishmentError(Throwable error) {
  // this means we cannot connect to zookeeper to establish a session
  LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
  debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

@Override
 public void onBarrierError(String version, Throwable t) {
  LOG.error("Encountered error while attaining consensus on JobModel version " + version);
  metrics.barrierError.inc();
  stop();
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

@Override
 public void doHandleDataDeleted(String dataPath) {
  LOG.warn("JobModel version z-node has been deleted. Shutting down the coordinator" + dataPath);
  debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0,  () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

@Override
 public void handleSessionEstablishmentError(Throwable error) {
  // this means we cannot connect to zookeeper to establish a session
  LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
  debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core

@Override
 public void onBarrierError(String version, Throwable t) {
  LOG.error("Encountered error while attaining consensus on JobModel version " + version);
  metrics.barrierError.inc();
  stop();
 }
}

代码示例来源:origin: apache/samza

@Override
 public void handleSessionEstablishmentError(Throwable error) {
  // this means we cannot connect to zookeeper to establish a session
  zkSessionMetrics.zkSessionErrors.inc();
  LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
  debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available.
 */
@Override
public void doHandleDataChange(String dataPath, Object data) {
 debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> {
   String jobModelVersion = (String) data;
   LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
   newJobModel = zkUtils.getJobModel(jobModelVersion);
   LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
   if (!newJobModel.getContainers().containsKey(processorId)) {
    LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
      processorId, newJobModel);
    stop();
   } else {
    // stop current work
    if (coordinatorListener != null) {
     coordinatorListener.onJobModelExpired();
    }
    // update ZK and wait for all the processors to get this new version
    barrier.join(jobModelVersion, processorId);
   }
  });
}

代码示例来源:origin: apache/samza

@Test
 public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
  ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
  ZkClient mockZkClient = Mockito.mock(ZkClient.class);
  when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);

  ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
  when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
  when(zkUtils.getZkClient()).thenReturn(mockZkClient);
  when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));

  ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);

  ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));

  StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
  zkJobCoordinator.debounceTimer = mockDebounceTimer;
  zkJobCoordinator.streamPartitionCountMonitor = monitor;

  zkJobCoordinator.stop();

  Mockito.verify(monitor).stop();
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

相关文章