org.apache.samza.zk.ZkKeyBuilder.getJobModelVersionBarrierPrefix()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(19.3k)|赞(0)|评价(0)|浏览(126)

本文整理了Java中org.apache.samza.zk.ZkKeyBuilder.getJobModelVersionBarrierPrefix()方法的一些代码示例,展示了ZkKeyBuilder.getJobModelVersionBarrierPrefix()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkKeyBuilder.getJobModelVersionBarrierPrefix()方法的具体详情如下:
包路径:org.apache.samza.zk.ZkKeyBuilder
类名称:ZkKeyBuilder
方法名:getJobModelVersionBarrierPrefix

ZkKeyBuilder.getJobModelVersionBarrierPrefix介绍

暂无

代码示例

代码示例来源:origin: apache/samza

void deleteOldBarrierVersions(int numVersionsToLeave) {
 // read current list of barriers
 String path = keyBuilder.getJobModelVersionBarrierPrefix();
 LOG.info("About to delete old barrier paths from " + path);
 List<String> znodeIds = zkClient.getChildren(path);
 LOG.info("List of all zkNodes: " + znodeIds);
 deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
  @Override
  public int compare(String o1, String o2) {
   // barrier's name format is barrier_<num>
   return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
  }
 });
}

代码示例来源:origin: org.apache.samza/samza-core

void deleteOldBarrierVersions(int numVersionsToLeave) {
 // read current list of barriers
 String path = keyBuilder.getJobModelVersionBarrierPrefix();
 LOG.info("About to delete old barrier paths from " + path);
 List<String> znodeIds = zkClient.getChildren(path);
 LOG.info("List of all zkNodes: " + znodeIds);
 deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
  @Override
  public int compare(String o1, String o2) {
   // barrier's name format is barrier_<num>
   return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
  }
 });
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

void deleteOldBarrierVersions(int numVersionsToLeave) {
 // read current list of barriers
 String path = keyBuilder.getJobModelVersionBarrierPrefix();
 LOG.info("About to delete old barrier paths from " + path);
 List<String> znodeIds = zkClient.getChildren(path);
 LOG.info("List of all zkNodes: " + znodeIds);
 deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
  @Override
  public int compare(String o1, String o2) {
   // barrier's name format is barrier_<num>
   return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
  }
 });
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

void deleteOldBarrierVersions(int numVersionsToLeave) {
 // read current list of barriers
 String path = keyBuilder.getJobModelVersionBarrierPrefix();
 LOG.info("About to delete old barrier paths from " + path);
 List<String> znodeIds = zkClient.getChildren(path);
 LOG.info("List of all zkNodes: " + znodeIds);
 deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
  @Override
  public int compare(String o1, String o2) {
   // barrier's name format is barrier_<num>
   return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
  }
 });
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

void deleteOldBarrierVersions(int numVersionsToLeave) {
 // read current list of barriers
 String path = keyBuilder.getJobModelVersionBarrierPrefix();
 LOG.info("About to delete old barrier paths from " + path);
 List<String> znodeIds = zkClient.getChildren(path);
 LOG.info("List of all zkNodes: " + znodeIds);
 deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
  @Override
  public int compare(String o1, String o2) {
   // barrier's name format is barrier_<num>
   return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
  }
 });
}

代码示例来源:origin: apache/samza

@Test
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.streamPartitionCountMonitor = monitor;
 when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
 ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
 listener.onBecomingLeader();
 Mockito.verify(monitor).start();
}

代码示例来源:origin: apache/samza

@Test
 public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
  ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
  ZkClient mockZkClient = Mockito.mock(ZkClient.class);
  when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);

  ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
  when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
  when(zkUtils.getZkClient()).thenReturn(mockZkClient);
  when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));

  ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);

  ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));

  StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
  zkJobCoordinator.debounceTimer = mockDebounceTimer;
  zkJobCoordinator.streamPartitionCountMonitor = monitor;

  zkJobCoordinator.stop();

  Mockito.verify(monitor).stop();
 }
}

代码示例来源:origin: apache/samza

@Test
public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.streamPartitionCountMonitor = monitor;
 ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
 Mockito.verify(monitor).stop();
}

代码示例来源:origin: apache/samza

@Test
public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 CountDownLatch jcShutdownLatch = new CountDownLatch(1);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 doAnswer(new Answer<Void>() {
  public Void answer(InvocationOnMock invocation) {
   jcShutdownLatch.countDown();
   return null;
  }
 }).when(zkJobCoordinator).stop();
 final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
 zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
 verify(zkJobCoordinator, Mockito.atMost(1)).stop();
 assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
}

代码示例来源:origin: apache/samza

@Test
public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
 final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
 verify(zkUtils).incGeneration();
 verify(mockDebounceTimer).cancelAllScheduledActions();
 verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
}

代码示例来源:origin: apache/samza

@Test
public void testCleanUpZkBarrierVersion() {
 String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
 zkUtils.getZkClient().createPersistent(root, true);
 ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null);
 for (int i = 200; i < 210; i++) {
  barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
 }
 zkUtils.deleteOldBarrierVersions(5);
 List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
 Collections.sort(zNodeIds);
 Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
   zNodeIds);
}

代码示例来源:origin: apache/samza

@Test
public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
 ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
 ZkClient mockZkClient = Mockito.mock(ZkClient.class);
 when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
 when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
 when(zkUtils.getZkClient()).thenReturn(mockZkClient);
 when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
 zkJobCoordinator.debounceTimer = mockDebounceTimer;
 zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
 final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
 zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.AuthFailed);
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
 zkSessionStateChangedListener.handleSessionEstablishmentError(new SamzaException("Test exception"));
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionDisconnects.getCount());
 Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSyncConnected.getCount());
 Assert.assertEquals(2, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
}

代码示例来源:origin: apache/samza

@Test
 public void testJobModelPath() {
  ZkKeyBuilder builder = new ZkKeyBuilder("test");

  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModelVersion", builder.getJobModelVersionPath());
  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
  String version = "2";
  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
  Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER_PATH + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.processorId = createProcessorId(config);
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
}

代码示例来源:origin: apache/samza

ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
 this.config = config;
 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
 this.processorId = processorId;
 this.zkUtils = zkUtils;
 // setup a listener for a session state change
 // we are mostly interested in "session closed" and "new session created" events
 zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
 leaderElector = new ZkLeaderElector(processorId, zkUtils);
 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
 debounceTimer = new ScheduleAfterDebounceTime(processorId);
 debounceTimer.setScheduledTaskCallback(throwable -> {
   LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable);
   stop();
  });
 this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
 systemAdmins = new SystemAdmins(config);
 streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
 LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class);
 LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config);
 this.locationId = locationIdProvider.getLocationId();
}

相关文章