我正在尝试实现trident+drpc。我设计的拓扑结构不会无限期地运行。我有两个独立的类,一个用于喷口实现,另一个用于实现drpc和trident。我的spout类(扩展irichspout的spout)发出客户的id。即
public class TriSpout implements IRichSpout{
//some logic here
spoutOutputCollector.emit(new Values(id))
}
现在我从另一个类中的输出收集器获得了值,该类使用drpc实现了trident。
public class TriDrpc{
.....
TriSpout spout=new TriSpout1();
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1",spout)
.parallelismHint(1)
.each(new Fields("id"), new Compute(), new Fields("value"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"))
drpc拓扑定义如下
topology.newDRPCStream("Calc", drpc)
.each(new Fields("args"), new Split(), new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));
drpc请求如下
public static void main(String[] args) throws Exception {
Config conf = new Config();
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Calculator", conf, buildTopology(drpc));
System.out.println("DRPC RESULT: "
+ drpc.execute("Calc", "id"));
Thread.sleep(1000);
} else {
conf.setNumWorkers(8);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
现在在上面的代码中,在drpc请求中,即。
System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));
这个 "id"
应该与喷口发出的id相同,即我想知道哪个客户有使用此id的活动帐户,因此我需要发送喷口发出的所有id的drpc请求。现在drpc是在main类中,我怎样才能在不手动指定id的情况下将喷口发出的值传递给drpc请求呢?
有人能帮忙吗
用新信息编辑
1条答案
按热度按时间kt06eoxx1#
更新
好吧,现在你的问题更清楚了,谢谢。
因此,您需要处理drpc请求,请求的id与同一drpc的拓扑喷口发出的id相同。
实现这一点的唯一方法是将从喷口发出的id持久化到storm外部持久存储(例如,rdms或分布式hashmap)。
这样,在提交拓扑以在storm集群上执行之后,就可以轮询持久存储以获取新id,并对每个新id执行drpc请求。
原始答案
我想我不明白这个问题。您是否正在尝试使用来自同一drpc拓扑的输出的请求id参数执行storm drpc请求?我不认为这是一个有效的和有意使用的drpc拓扑。你最好用普通的拓扑结构。
drpc拓扑用于有限计算,而普通拓扑用于连续计算。drpc调用接受drpc拓扑的名称,以及一组用于计算drpc调用结果的输入参数。普通的storm(或trident)拓扑只是无限期地运行,计算某种结果并将其持久化。
我希望这有帮助。如果不是的话,请把你的问题改好,因为你的问题不太清楚。