org.apache.samza.zk.ZkUtils.getSortedActiveProcessorsZnodes()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(10.1k)|赞(0)|评价(0)|浏览(113)

本文整理了Java中org.apache.samza.zk.ZkUtils.getSortedActiveProcessorsZnodes()方法的一些代码示例,展示了ZkUtils.getSortedActiveProcessorsZnodes()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getSortedActiveProcessorsZnodes()方法的具体详情如下:
包路径:org.apache.samza.zk.ZkUtils
类名称:ZkUtils
方法名:getSortedActiveProcessorsZnodes

ZkUtils.getSortedActiveProcessorsZnodes介绍

[英]Method is used to get the sorted list of currently active/registered processors (znodes)
[中]方法用于获取当前活动/注册处理器(znode)的排序列表

代码示例

代码示例来源:origin: apache/samza

/**
 * Method is used to get the list of currently active/registered processor ids
 * @return List of processorIds
 */
public List<String> getSortedActiveProcessorsIDs() {
 return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

/**
 * Method is used to get the list of currently active/registered processor ids
 * @return List of processorIds
 */
public List<String> getSortedActiveProcessorsIDs() {
 return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}

代码示例来源:origin: org.apache.samza/samza-core

/**
 * Method is used to get the list of currently active/registered processor ids
 * @return List of processorIds
 */
public List<String> getSortedActiveProcessorsIDs() {
 return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Method is used to get the list of currently active/registered processor ids
 * @return List of processorIds
 */
public List<String> getSortedActiveProcessorsIDs() {
 return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Method is used to get the list of currently active/registered processor ids
 * @return List of processorIds
 */
public List<String> getSortedActiveProcessorsIDs() {
 return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}

代码示例来源:origin: org.apache.samza/samza-core

/**
 * Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
 * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
 */
List<ProcessorNode> getAllProcessorNodes() {
 List<String> processorZNodes = getSortedActiveProcessorsZnodes();
 LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
 List<ProcessorNode> processorNodes = new ArrayList<>();
 for (String processorZNode: processorZNodes) {
  String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
  String data = readProcessorData(ephemeralProcessorPath);
  if (data != null) {
   processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
  }
 }
 return processorNodes;
}

代码示例来源:origin: apache/samza

/**
 * Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
 * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
 */
List<ProcessorNode> getAllProcessorNodes() {
 List<String> processorZNodes = getSortedActiveProcessorsZnodes();
 LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
 List<ProcessorNode> processorNodes = new ArrayList<>();
 for (String processorZNode: processorZNodes) {
  String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
  String data = readProcessorData(ephemeralProcessorPath);
  if (data != null) {
   processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
  }
 }
 return processorNodes;
}

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
 * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
 */
List<ProcessorNode> getAllProcessorNodes() {
 List<String> processorZNodes = getSortedActiveProcessorsZnodes();
 LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
 List<ProcessorNode> processorNodes = new ArrayList<>();
 for (String processorZNode: processorZNodes) {
  String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
  String data = readProcessorData(ephemeralProcessorPath);
  if (data != null) {
   processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
  }
 }
 return processorNodes;
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
 * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
 */
List<ProcessorNode> getAllProcessorNodes() {
 List<String> processorZNodes = getSortedActiveProcessorsZnodes();
 LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
 List<ProcessorNode> processorNodes = new ArrayList<>();
 for (String processorZNode: processorZNodes) {
  String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
  String data = readProcessorData(ephemeralProcessorPath);
  if (data != null) {
   processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
  }
 }
 return processorNodes;
}

代码示例来源:origin: org.apache.samza/samza-core_2.10

/**
 * Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
 * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
 */
List<ProcessorNode> getAllProcessorNodes() {
 List<String> processorZNodes = getSortedActiveProcessorsZnodes();
 LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
 List<ProcessorNode> processorNodes = new ArrayList<>();
 for (String processorZNode: processorZNodes) {
  String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
  String data = readProcessorData(ephemeralProcessorPath);
  if (data != null) {
   processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
  }
 }
 return processorNodes;
}

代码示例来源:origin: apache/samza

@Test
public void testGetActiveProcessors() {
 Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size());
 zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1"));
 Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size());
}

代码示例来源:origin: apache/samza

String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));

代码示例来源:origin: org.apache.samza/samza-core_2.11

String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));

代码示例来源:origin: org.apache.samza/samza-core

String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));

代码示例来源:origin: org.apache.samza/samza-core_2.10

String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));

代码示例来源:origin: org.apache.samza/samza-core_2.12

String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));

代码示例来源:origin: apache/samza

@Test
public void testUnregisteredProcessorInLeaderElection() {
 String processorId = "1";
 ZkUtils mockZkUtils = mock(ZkUtils.class);
 when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(new ArrayList<String>());
 Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
 ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
 when(kb.getProcessorsPath()).thenReturn("");
 when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
 ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, null);
 leaderElector.setLeaderElectorListener(() -> { });
 try {
  leaderElector.tryBecomeLeader();
  Assert.fail("Was expecting leader election to fail!");
 } catch (SamzaException e) {
  // No-op Expected
 }
}

代码示例来源:origin: apache/samza

@Test
public void testLeaderElectionRegistersProcessor() {
 List<String> activeProcessors = new ArrayList<String>() {
  {
   add("0000000000");
  }
 };
 ZkUtils mockZkUtils = mock(ZkUtils.class);
 when(mockZkUtils.registerProcessorAndGetId(any())).
   thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
 when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(activeProcessors);
 Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
 ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
 when(kb.getProcessorsPath()).thenReturn("");
 when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
 ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, null);
 BooleanResult isLeader = new BooleanResult();
 leaderElector.setLeaderElectorListener(() -> isLeader.res = true);
 leaderElector.tryBecomeLeader();
 Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 100));
}

代码示例来源:origin: apache/samza

leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true);
Assert.assertEquals(0, testZkUtils.getSortedActiveProcessorsZnodes().size());
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
Assert.assertEquals(3, testZkUtils.getSortedActiveProcessorsZnodes().size());
zkUtils3.close();
Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessorsZnodes());

代码示例来源:origin: apache/samza

Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessorsZnodes();
Assert.assertEquals(3, currentActiveProcessors.size());
Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessorsZnodes());

相关文章