本文整理了Java中org.apache.storm.utils.Utils.getGlobalStreamId()
方法的一些代码示例,展示了Utils.getGlobalStreamId()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.getGlobalStreamId()
方法的具体详情如下:
包路径:org.apache.storm.utils.Utils
类名称:Utils
方法名:getGlobalStreamId
暂无
代码示例来源:origin: apache/storm
public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
Set<String> allIds = new HashSet<String>();
allIds.addAll(topology.get_bolts().keySet());
allIds.addAll(topology.get_spouts().keySet());
for (String id : allIds) {
inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
}
return inputs;
}
代码示例来源:origin: apache/storm
public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
Set<String> boltIds = topology.get_bolts().keySet();
Set<String> spoutIds = topology.get_spouts().keySet();
for (String id : spoutIds) {
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
}
for (String id : boltIds) {
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
}
return inputs;
}
代码示例来源:origin: apache/storm
inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
代码示例来源:origin: apache/storm
common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
Thrift.prepareDirectGrouping());
代码示例来源:origin: apache/storm
private StormTopology mkInvalidateTopology2() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
代码示例来源:origin: apache/storm
private StormTopology mkValidateTopology() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
代码示例来源:origin: apache/storm
private StormTopology mkInvalidateTopology1() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("3", null),
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
代码示例来源:origin: apache/storm
private StormTopology mkInvalidateTopology3() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", "non-exists-stream"),
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
代码示例来源:origin: apache/storm
@Test
public void testHooks() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.build()) {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestPlannerSpout(new Fields("conf"))));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
new HooksBolt()));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
.map(value -> new FixedTuple(new Values(value)))
.collect(Collectors.toList());
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
List<List<Object>> expectedTuples = Arrays.asList(
Arrays.asList(0, 0, 0, 0),
Arrays.asList(2, 1, 0, 1),
Arrays.asList(4, 1, 1, 2),
Arrays.asList(6, 2, 1, 3));
assertThat(Testing.readTuples(results, "2"), is(expectedTuples));
}
}
代码示例来源:origin: apache/storm
boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
代码示例来源:origin: apache/storm
@Test
public void testAckBranching() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withTracked()
.build()) {
AckTrackingFeeder feeder = new AckTrackingFeeder("num");
Map<String, SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
cluster.advanceClusterTime(11);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(0);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(2);
}
}
代码示例来源:origin: apache/storm
@Test
public void testWithTrackedCluster() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withTracked()
.build()) {
AckTrackingFeeder feeder = new AckTrackingFeeder("num");
Map<String, Thrift.SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
Map<String, Thrift.BoltDetails> boltMap = new HashMap<>();
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
cluster.submitTopology("test-acking2", new Config(), tracked);
cluster.advanceClusterTime(11);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(0);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(2);
}
}
代码示例来源:origin: apache/storm
boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
代码示例来源:origin: apache/storm
@Test
public void testAckingSelfAnchor() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withTracked()
.build()) {
AckTrackingFeeder feeder = new AckTrackingFeeder("num");
Map<String, SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
cluster.submitTopology("test", Collections.emptyMap(), tracked);
cluster.advanceClusterTime(11);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(1);
feeder.feed(new Values(1));
feeder.feed(new Values(1));
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 3);
feeder.assertNumAcks(3);
}
}
代码示例来源:origin: apache/storm
@Test
public void testSubmitInactiveTopology() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
.build()) {
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
Map<String, SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
boltPrepared = false;
spoutOpened = false;
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
cluster.advanceClusterTime(11);
feeder.feed(new Values("a"), 1);
cluster.advanceClusterTime(9);
assertThat(boltPrepared, is(false));
assertThat(spoutOpened, is(false));
cluster.getNimbus().activate("test");
cluster.advanceClusterTime(12);
assertAcked(tracker, 1);
assertThat(boltPrepared, is(true));
assertThat(spoutOpened, is(true));
}
}
代码示例来源:origin: apache/storm
@Test
public void testMultiTasksPerCluster() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withSupervisors(4)
.build()) {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true)));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareAllGrouping()),
new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS, 6)));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", Collections.singletonList(new FixedTuple(new Values("a")))));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
new Values(0),
new Values(1),
new Values(2),
new Values(3),
new Values(4),
new Values(5)
));
}
}
代码示例来源:origin: apache/storm
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareGlobalGrouping()),
new AckEveryOtherBolt()));
代码示例来源:origin: apache/storm
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new AckEveryOtherBolt()));
代码示例来源:origin: apache/storm
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new AckEveryOtherBolt()));
代码示例来源:origin: apache/storm
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareGlobalGrouping()),
new ResetTimeoutBolt()));
内容来源于网络,如有侵权,请联系作者删除!