从输出收集器向drpc请求传递一个值?

des4xlb0  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(255)

我正在尝试实现trident+drpc。我设计的拓扑结构不会无限期地运行。我有两个独立的类,一个用于喷口实现,另一个用于实现drpc和trident。我的spout类(扩展irichspout的spout)发出客户的id。即

public class TriSpout implements IRichSpout{
  //some logic here
    spoutOutputCollector.emit(new Values(id))
  }

现在我从另一个类中的输出收集器获得了值,该类使用drpc实现了trident。

public class TriDrpc{

    .....
    TriSpout spout=new TriSpout1();        
    TridentTopology topology = new TridentTopology();  
    TridentState wordCounts =
          topology.newStream("spout1",spout)
            .parallelismHint(1)
            .each(new Fields("id"), new Compute(), new Fields("value"))
            .persistentAggregate(new MemoryMapState.Factory(),
                                 new Count(), new Fields("count"))

drpc拓扑定义如下

topology.newDRPCStream("Calc", drpc)
         .each(new Fields("args"), new Split(), new Fields("word"))                
         .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));

drpc请求如下

public static void main(String[] args) throws Exception {
    Config conf = new Config(); 

    if (args.length == 0) {
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Calculator", conf,   buildTopology(drpc));          
    System.out.println("DRPC RESULT: "
                + drpc.execute("Calc", "id"));
    Thread.sleep(1000);

    } else {
        conf.setNumWorkers(8);
        StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
}

现在在上面的代码中,在drpc请求中,即。

System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));

这个 "id" 应该与喷口发出的id相同,即我想知道哪个客户有使用此id的活动帐户,因此我需要发送喷口发出的所有id的drpc请求。现在drpc是在main类中,我怎样才能在不手动指定id的情况下将喷口发出的值传递给drpc请求呢?
有人能帮忙吗
用新信息编辑

kt06eoxx

kt06eoxx1#

更新

好吧,现在你的问题更清楚了,谢谢。
因此,您需要处理drpc请求,请求的id与同一drpc的拓扑喷口发出的id相同。
实现这一点的唯一方法是将从喷口发出的id持久化到storm外部持久存储(例如,rdms或分布式hashmap)。
这样,在提交拓扑以在storm集群上执行之后,就可以轮询持久存储以获取新id,并对每个新id执行drpc请求。

原始答案

我想我不明白这个问题。您是否正在尝试使用来自同一drpc拓扑的输出的请求id参数执行storm drpc请求?我不认为这是一个有效的和有意使用的drpc拓扑。你最好用普通的拓扑结构。
drpc拓扑用于有限计算,而普通拓扑用于连续计算。drpc调用接受drpc拓扑的名称,以及一组用于计算drpc调用结果的输入参数。普通的storm(或trident)拓扑只是无限期地运行,计算某种结果并将其持久化。
我希望这有帮助。如果不是的话,请把你的问题改好,因为你的问题不太清楚。

相关问题