ApacheStorm2.1.0本地drpc不返回任何响应,尽管最后一个螺栓向收集器发出了一个元组

zsbz8rwp  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(352)

我在尝试运行包含一个螺栓的drpc拓扑并通过本地集群查询它时遇到问题。在使用intellij进行调试之后,bolt确实被执行了,但是jcqueue在bolt被执行之后一直卡在无限循环中,直到向服务器发送超时为止。
以下是用于构建拓扑生成器的代码:

public static LinearDRPCTopologyBuilder createBuilder()
{
    var bolt = new RedisSalesLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
    var builder = new LinearDRPCTopologyBuilder("sales");
    builder.addBolt(bolt, 1).localOrShuffleGrouping();
    return builder;
}

redisaleslookupolt只是ibasicbolt对绝地执行hget命令的一个非常简单的实现。这个 execute RedisalesLookupBolt的方法只是发出 Values 包含如下声明的两个字段的值:

declarer.declare(new Fields("id", "Value"));

在单元测试中构建和查询拓扑,如下所示:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);

try(LocalDRPC drpc = new LocalDRPC())
{
       LocalCluster cluster = new LocalCluster();
       var builder = BasicRedisRPCTopology.createBuilder();
       LocalCluster.LocalTopology topo = cluster.submitTopology(
              "Sales-fetch", conf, builder.createLocalTopology(drpc));
       var result = drpc.execute("sales", "XXXXX");
       System.out.println("################ Result: " + result);
}
catch (Exception e)
{
       e.printStackTrace();
}

在读取日志时,我确信数据被螺栓固定,并且所有的东西都被释放出来了

但最后,我用我的测试方法轻轻地打印出了这个堆栈跟踪。当然,不会将任何值分配给结果变量,并且进程永远不会到达最后的打印指令:

有些东西我可能不明白,有些东西我在这里错过了。boltexecutor用来检索要执行的bolt的id的jcqueue似乎永远不会结束,尽管只有一个参数被发送到本地drpc,只有一个bolt被声明到拓扑中。我已经尝试向拓扑添加更多螺栓,或者更改用于创建它的生成器实现,但没有成功。

c8ib6hqw

c8ib6hqw1#

嗯,我找到了一个适合我使用ApacheStorm 2.1.0的用例的解决方案。
看来 submitTopology 文档中提出的本地集群方法没有使用 LinearDRPCTopologyBuilder 构建拓扑。
通过查看源代码,可以了解如何应用 LinearDRPCTopologyBuilder 逻辑到 TopologyBuilder 直接。
以下是应用于 createBuilder 方法:

public static TopologyBuilder createBuilder(Optional<ILocalDRPC> localDRPC)
    {
        var spout = localDRPC
                .map(drpc -> new DRPCSpout("sales", drpc))
                .orElse(new DRPCSpout("sales"));
        var bolt = new RedisSalesLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
        var builder = new TopologyBuilder();
        builder.setSpout("drpc", spout);
        builder.setBolt("redisLookup", bolt, 1)
               .shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults())
               .shuffleGrouping("redisLookup");
        return builder;
    }

下面是单元测试现在的样子:

Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(1);

        try(LocalDRPC drpc = new LocalDRPC())
        {
            LocalCluster cluster = new LocalCluster();
            var builder = BasicRedisRPCTopology.createBuilder(Optional.of(drpc));
            cluster.submitTopology("Sales-fetch", conf, builder.createTopology());
            var result = drpc.execute("sales", "XXXXX");
            System.out.println("################ Result: " + result);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }

不幸的是,此解决方案不允许使用 LinearDRPCTopologyBuilder 并暗示要“手动”构建所有拓扑流。必须将Map器行为更改为,因为字段的公开顺序与以前不同。

相关问题