ApacheStorm—从一个数据库获取数据,并使用trident拓扑将其处理和存储到另一个数据库

2guxujil  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(358)

我想通过一个喷口从一个数据库中获取数据,并使用trident处理数据并将其存储在另一个数据库中。我对storm和trident不太熟悉,不知道如何实现它。我从一个喷口(实现trident支持的irichspout的独立java类)中的数据库中获取数据,并将其作为对象发出。我需要将其传递给trident拓扑进行处理(计算记录数)并将其存储到数据库中。

TridentTopology topology = new TridentTopology();  
 TridentState wordCounts =
          topology.newStream("spout1",spout)

现在新的流以一个喷口作为输入,即语法是

Stream storm.trident.TridentTopology.newStream(String txId, IRichSpout spout)

但是我想将喷口发出的对象作为流的输入,以便trident处理并保存到数据库中。那么,如何将我的喷口类放入trident中并将其传递给新流,还是将喷口和trident合并为同一个类??
有人能帮我吗。。。。。

o2gm4chl

o2gm4chl1#

你可以这样做

MyFooSpout spout = new MyFooSpout();
    topology.newStream("spout1", spout)....

在哪里 MyFooSpout 类应实现 IRichSpout 从三叉戟教程 newStream 中的方法 TridentTopology 在拓扑中创建从任何输入源读取的数据流。
你的情况可能是 MyFooSpout
。我在一个spout(实现trident支持的irichspout的独立java类)中从数据库中获取数据,并将其作为对象发出
你能澄清一下你到底指的是什么吗?你的喷口代码看起来怎么样?作为一个非常通用的例子,如果我们写的东西像(取自教程页)

TridentState wordCounts = topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word"))

这意味着 spout 应该只发射一个场,即 sentence . 通过呼叫 each 这个 Split 函数将应用于流中的每个元组,该元组将根据 sentence 现场。但是,这可能会因您的要求而有所不同。e、 它可能是一个 Filter 作为 MyFilter extends BaseFilter 或者 function 作为 MyCustomFuction extends BaseFunction . 查看api页面了解更多详细信息。

相关问题