我已经创建了一个带有喷口的storm拓扑,该喷口发出许多元组以进行基准测试。一旦所有元组都从喷口发出或者拓扑中不再有元组流动,我就想停止/终止我的拓扑。
这是我的拓扑图。
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
//Disabled ACK'ing for higher throughput
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
LoadGeneratorSource loadGenerator = new LoadGeneratorSource(runtime,numberOfTuplesToBeEmitted);
builder.setSpout("loadGenerator", loadGenerator);
//Some Bolts Here
while (loadGenerator.isRunning()){
//Active Waiting
}
//DO SOME STUFF WITH JAVA
cluster.killTopology("StormBenchmarkTopology");
问题是我在这个范围内引用的loadgenerator示例与在spout线程中运行的不同。因此,isrunning()总是返回true,即使在喷口线程中,当没有更多元组要发出时,它的值是false。
这里是loadgeneratorsource类的一部分。
public class LoadGeneratorSource extends BaseRichSpout {
private final int throughput;
private boolean running;
private final long runtime;
public LoadGeneratorSource(long runtime,int throughput) {
this.throughput = throughput;
this.runtime = runtime;
}
@Override
public void nextTuple() {
ThroughputStatistics.getInstance().pause(false);
long endTime = System.currentTimeMillis() + runtime;
while (running) {
long startTs = System.currentTimeMillis();
for (int i = 0; i < throughput; i++) {
try {
emitValue(readNextTuple());
} catch (Exception e) {
e.printStackTrace();
}
}
while (System.currentTimeMillis() < startTs + 1000) {
// active waiting
}
if (endTime <= System.currentTimeMillis())
setRunning(false);
}
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
//MORE STUFF
}
有人能告诉我一种方法来停止我的拓扑一旦有更多的元组发射从喷口或流动的拓扑?提前谢谢你的帮助。
1条答案
按热度按时间exdqitrt1#
这看起来像是从喷口杀死风暴拓扑的复制品。请试一下这里给出的答案。
只是简单总结一下;你尝试的方法是行不通的,但是你可以使用一个来自喷口的nimbusclient请求nimbus终止你的拓扑。另一个好处是,一旦您部署到一个真正的集群,它也会起作用。