本文整理了Java中org.apache.samza.zk.ZkUtils.getSortedActiveProcessorsZnodes()
方法的一些代码示例,展示了ZkUtils.getSortedActiveProcessorsZnodes()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils.getSortedActiveProcessorsZnodes()
方法的具体详情如下:
包路径:org.apache.samza.zk.ZkUtils
类名称:ZkUtils
方法名:getSortedActiveProcessorsZnodes
[英]Method is used to get the sorted list of currently active/registered processors (znodes)
[中]方法用于获取当前活动/注册处理器(znode)的排序列表
代码示例来源:origin: apache/samza
/**
* Method is used to get the list of currently active/registered processor ids
* @return List of processorIds
*/
public List<String> getSortedActiveProcessorsIDs() {
return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
/**
* Method is used to get the list of currently active/registered processor ids
* @return List of processorIds
*/
public List<String> getSortedActiveProcessorsIDs() {
return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}
代码示例来源:origin: org.apache.samza/samza-core
/**
* Method is used to get the list of currently active/registered processor ids
* @return List of processorIds
*/
public List<String> getSortedActiveProcessorsIDs() {
return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Method is used to get the list of currently active/registered processor ids
* @return List of processorIds
*/
public List<String> getSortedActiveProcessorsIDs() {
return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
/**
* Method is used to get the list of currently active/registered processor ids
* @return List of processorIds
*/
public List<String> getSortedActiveProcessorsIDs() {
return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}
代码示例来源:origin: org.apache.samza/samza-core
/**
* Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
* @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
*/
List<ProcessorNode> getAllProcessorNodes() {
List<String> processorZNodes = getSortedActiveProcessorsZnodes();
LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
List<ProcessorNode> processorNodes = new ArrayList<>();
for (String processorZNode: processorZNodes) {
String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
String data = readProcessorData(ephemeralProcessorPath);
if (data != null) {
processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
}
}
return processorNodes;
}
代码示例来源:origin: apache/samza
/**
* Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
* @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
*/
List<ProcessorNode> getAllProcessorNodes() {
List<String> processorZNodes = getSortedActiveProcessorsZnodes();
LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
List<ProcessorNode> processorNodes = new ArrayList<>();
for (String processorZNode: processorZNodes) {
String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
String data = readProcessorData(ephemeralProcessorPath);
if (data != null) {
processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
}
}
return processorNodes;
}
代码示例来源:origin: org.apache.samza/samza-core_2.11
/**
* Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
* @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
*/
List<ProcessorNode> getAllProcessorNodes() {
List<String> processorZNodes = getSortedActiveProcessorsZnodes();
LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
List<ProcessorNode> processorNodes = new ArrayList<>();
for (String processorZNode: processorZNodes) {
String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
String data = readProcessorData(ephemeralProcessorPath);
if (data != null) {
processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
}
}
return processorNodes;
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
* @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
*/
List<ProcessorNode> getAllProcessorNodes() {
List<String> processorZNodes = getSortedActiveProcessorsZnodes();
LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
List<ProcessorNode> processorNodes = new ArrayList<>();
for (String processorZNode: processorZNodes) {
String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
String data = readProcessorData(ephemeralProcessorPath);
if (data != null) {
processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
}
}
return processorNodes;
}
代码示例来源:origin: org.apache.samza/samza-core_2.10
/**
* Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
* @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
*/
List<ProcessorNode> getAllProcessorNodes() {
List<String> processorZNodes = getSortedActiveProcessorsZnodes();
LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
List<ProcessorNode> processorNodes = new ArrayList<>();
for (String processorZNode: processorZNodes) {
String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
String data = readProcessorData(ephemeralProcessorPath);
if (data != null) {
processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
}
}
return processorNodes;
}
代码示例来源:origin: apache/samza
@Test
public void testGetActiveProcessors() {
Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size());
zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1"));
Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size());
}
代码示例来源:origin: apache/samza
String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
代码示例来源:origin: org.apache.samza/samza-core_2.11
String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
代码示例来源:origin: org.apache.samza/samza-core
String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
代码示例来源:origin: org.apache.samza/samza-core_2.10
String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
代码示例来源:origin: org.apache.samza/samza-core_2.12
String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
代码示例来源:origin: apache/samza
@Test
public void testUnregisteredProcessorInLeaderElection() {
String processorId = "1";
ZkUtils mockZkUtils = mock(ZkUtils.class);
when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(new ArrayList<String>());
Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
when(kb.getProcessorsPath()).thenReturn("");
when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, null);
leaderElector.setLeaderElectorListener(() -> { });
try {
leaderElector.tryBecomeLeader();
Assert.fail("Was expecting leader election to fail!");
} catch (SamzaException e) {
// No-op Expected
}
}
代码示例来源:origin: apache/samza
@Test
public void testLeaderElectionRegistersProcessor() {
List<String> activeProcessors = new ArrayList<String>() {
{
add("0000000000");
}
};
ZkUtils mockZkUtils = mock(ZkUtils.class);
when(mockZkUtils.registerProcessorAndGetId(any())).
thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(activeProcessors);
Mockito.doNothing().when(mockZkUtils).validatePaths(any(String[].class));
ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
when(kb.getProcessorsPath()).thenReturn("");
when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, null);
BooleanResult isLeader = new BooleanResult();
leaderElector.setLeaderElectorListener(() -> isLeader.res = true);
leaderElector.tryBecomeLeader();
Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 100));
}
代码示例来源:origin: apache/samza
leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true);
Assert.assertEquals(0, testZkUtils.getSortedActiveProcessorsZnodes().size());
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
Assert.assertEquals(3, testZkUtils.getSortedActiveProcessorsZnodes().size());
zkUtils3.close();
Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessorsZnodes());
代码示例来源:origin: apache/samza
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessorsZnodes();
Assert.assertEquals(3, currentActiveProcessors.size());
Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessorsZnodes());
内容来源于网络,如有侵权,请联系作者删除!