我想通过一个喷口从一个数据库中获取数据,并使用trident处理数据并将其存储在另一个数据库中。我对storm和trident不太熟悉,不知道如何实现它。我从一个喷口(实现trident支持的irichspout的独立java类)中的数据库中获取数据,并将其作为对象发出。我需要将其传递给trident拓扑进行处理(计算记录数)并将其存储到数据库中。
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1",spout)
现在新的流以一个喷口作为输入,即语法是
Stream storm.trident.TridentTopology.newStream(String txId, IRichSpout spout)
但是我想将喷口发出的对象作为流的输入,以便trident处理并保存到数据库中。那么,如何将我的喷口类放入trident中并将其传递给新流,还是将喷口和trident合并为同一个类??
有人能帮我吗。。。。。
1条答案
按热度按时间o2gm4chl1#
你可以这样做
在哪里
MyFooSpout
类应实现IRichSpout
从三叉戟教程newStream
中的方法TridentTopology
在拓扑中创建从任何输入源读取的数据流。你的情况可能是
MyFooSpout
班。我在一个spout(实现trident支持的irichspout的独立java类)中从数据库中获取数据,并将其作为对象发出
你能澄清一下你到底指的是什么吗?你的喷口代码看起来怎么样?作为一个非常通用的例子,如果我们写的东西像(取自教程页)
这意味着
spout
应该只发射一个场,即sentence
. 通过呼叫each
这个Split
函数将应用于流中的每个元组,该元组将根据sentence
现场。但是,这可能会因您的要求而有所不同。e、 它可能是一个Filter
作为MyFilter extends BaseFilter
或者function
作为MyCustomFuction extends BaseFunction
. 查看api页面了解更多详细信息。