我正在尝试将storm(见这里)集成到我的项目中。我摸索拓扑、喷口和螺栓的概念。但现在,我正试图弄清楚一些事情的实际实现。
a) 我有一个用java和clojure的多语言环境。我的java代码是一个回调类,其中包含触发流数据的方法。推送到这些方法中的事件数据,就是我想用作喷口的内容。
因此,第一个问题是如何将进入这些方法的数据连接到一个喷口?我试图i)传递backtype.storm.topology.irichspout,然后ii)传递backtype.storm.spout.spoutoutOutCollector(请参见此处)到该spout的open函数(请参见此处)。但我看不到一种方法能真正通过任何一种Map或列表。
b) 我的项目剩下的都是clojure。通过这些方法将会有大量的数据。每个事件的id将介于1和100之间。在clojure中,我希望将来自喷口的数据分割成不同的执行线程。我认为,这些将是螺栓。
如何设置clojure螺栓从喷口获取事件数据,然后根据传入事件的id断开线程?
提前谢谢蒂姆
[编辑1]
实际上我已经克服了这个问题。我最终实现了我自己的虹彩嘴。然后我2)将喷口的内部元组连接到java回调类中的传入流数据。我不确定这是否是惯用的。但是它编译和运行没有错误。但是,3)我看不到通过printstuffbolt传入的流数据(肯定在那里)。
为了确保事件数据得到传播,在spout或bolt实现或拓扑定义中是否需要做一些特定的事情?谢谢。
;; tie Java callbacks to a Spout that I created
(.setSpout java-callback ibspout)
(storm/defbolt printstuff ["word"] [tuple collector]
(println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
)
(storm/topology
{ "1" (storm/spout-spec ibspout)
}
{ "3" (storm/bolt-spec { "1" :shuffle }
printstuff
)
})
[编辑2]
在so成员ankur的建议下,我正在重新调整我的拓扑结构。创建java回调之后,我将其元组传递给下面的ibspout,使用 (.setTuple ibspout (.getTuple java-callback))
. 我没有传递整个java回调对象,因为我得到了一个notserializable错误。一切编译和运行都没有错误。不过,我的打印资料里没有数据。六羟甲基三聚氰胺六甲醚。
public class IBSpout implements IRichSpout {
/**
* Storm spout stuff
*/
private SpoutOutputCollector _collector;
private List _tuple = new ArrayList();
public void setTuple(List tuple) { _tuple = tuple; }
public List getTuple() { return _tuple; }
/**
* Storm ISpout interface functions
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {}
public void activate() {}
public void deactivate() {}
public void nextTuple() {
_collector.emit(_tuple);
}
public void ack(Object msgId) {}
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
public java.util.Map getComponentConfiguration() { return new HashMap(); }
}
2条答案
按热度按时间70gysomp1#
对b部分的答复:
对我来说,这个简单的答案听起来像是在寻找一个字段分组,这样就可以控制在执行期间按id分组的工作方式。
也就是说,我不相信这是一个完整的答案,因为我不知道你为什么要这样做。如果您只是想要一个平衡的工作负载,那么无序分组是一个更好的选择。
smtd7mpg2#
似乎您正在将这个喷口传递给回调类,这似乎有点奇怪。当一个拓扑被执行时,storm会周期性地调用喷口
nextTuple
方法,因此您需要做的是将java回调传递给您的定制spout实现,以便在storm调用您的spout时,spout调用java回调来获取下一组元组,以便将它们输入到拓扑中。要理解的关键概念是,当storm请求时,spouts会拉取数据,而不是将数据推送到spouts。您的回调不能调用spout将数据推送到它,相反,当您的spout
nextTuple
方法被调用。