从输出收集器向drpc请求传递一个值?

des4xlb0  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(310)

我正在尝试实现trident+drpc。我设计的拓扑结构不会无限期地运行。我有两个独立的类,一个用于喷口实现,另一个用于实现drpc和trident。我的spout类(扩展irichspout的spout)发出客户的id。即

  1. public class TriSpout implements IRichSpout{
  2. //some logic here
  3. spoutOutputCollector.emit(new Values(id))
  4. }

现在我从另一个类中的输出收集器获得了值,该类使用drpc实现了trident。

  1. public class TriDrpc{
  2. .....
  3. TriSpout spout=new TriSpout1();
  4. TridentTopology topology = new TridentTopology();
  5. TridentState wordCounts =
  6. topology.newStream("spout1",spout)
  7. .parallelismHint(1)
  8. .each(new Fields("id"), new Compute(), new Fields("value"))
  9. .persistentAggregate(new MemoryMapState.Factory(),
  10. new Count(), new Fields("count"))

drpc拓扑定义如下

  1. topology.newDRPCStream("Calc", drpc)
  2. .each(new Fields("args"), new Split(), new Fields("word"))
  3. .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));

drpc请求如下

  1. public static void main(String[] args) throws Exception {
  2. Config conf = new Config();
  3. if (args.length == 0) {
  4. LocalDRPC drpc = new LocalDRPC();
  5. LocalCluster cluster = new LocalCluster();
  6. cluster.submitTopology("Calculator", conf, buildTopology(drpc));
  7. System.out.println("DRPC RESULT: "
  8. + drpc.execute("Calc", "id"));
  9. Thread.sleep(1000);
  10. } else {
  11. conf.setNumWorkers(8);
  12. StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
  13. }
  14. }

现在在上面的代码中,在drpc请求中,即。

  1. System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));

这个 "id" 应该与喷口发出的id相同,即我想知道哪个客户有使用此id的活动帐户,因此我需要发送喷口发出的所有id的drpc请求。现在drpc是在main类中,我怎样才能在不手动指定id的情况下将喷口发出的值传递给drpc请求呢?
有人能帮忙吗
用新信息编辑

kt06eoxx

kt06eoxx1#

更新

好吧,现在你的问题更清楚了,谢谢。
因此,您需要处理drpc请求,请求的id与同一drpc的拓扑喷口发出的id相同。
实现这一点的唯一方法是将从喷口发出的id持久化到storm外部持久存储(例如,rdms或分布式hashmap)。
这样,在提交拓扑以在storm集群上执行之后,就可以轮询持久存储以获取新id,并对每个新id执行drpc请求。

原始答案

我想我不明白这个问题。您是否正在尝试使用来自同一drpc拓扑的输出的请求id参数执行storm drpc请求?我不认为这是一个有效的和有意使用的drpc拓扑。你最好用普通的拓扑结构。
drpc拓扑用于有限计算,而普通拓扑用于连续计算。drpc调用接受drpc拓扑的名称,以及一组用于计算drpc调用结果的输入参数。普通的storm(或trident)拓扑只是无限期地运行,计算某种结果并将其持久化。
我希望这有帮助。如果不是的话,请把你的问题改好,因为你的问题不太清楚。

相关问题