我在群集上运行的storm topology有以下代码:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping(
"word-normalizer", new Fields("word"));
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
try {
StormSubmitter.submitTopology("Test-topology", conf,
builder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
当我调试它时,下面 cleanup()
方法未在类中运行 WordCounter
...
@Override
public void cleanup() {
System.out.println("-- Word Counter --");
}
…拓扑结构还没有运行完。
2条答案
按热度按时间xkftehaa1#
清理()
在本地模式下调用
shutdown()
你的localcluster
. 但不是在生产集群上storm kill
执行或从nimbus服务器中删除(从不在生产集群上调用cleanup,仅在本地模式下调用)有关更多详细信息:请转到此文档
同样的事情也发生在
close()
此外(这是喷口)更多的细节,在这里检查cnjp1d6j2#
不在集群上调用清理。只是本地模式
http://groups.google.com/group/storm-user/browse_thread/thread/1a7b4998e6599f83/9683ed2b34e2c027?hl=en&lnk=gst&q=clean+up#9683ed2b34e2c027