我试着用三叉戟做一个小例子。我们的目标是观察元组在失败时是如何重放的。下面是拓扑定义
Random rand = new Random();
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(1);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", new RandomIntegerSpout())
.map((MapFunction) tridentTuple -> {
if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
(rand.nextInt(2) == 1)) {
System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
throw new ReportedFailedException("Divisible by 50");
}
return new Values(tridentTuple.toArray());
})
.peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));
我使用 RandomIntegerSpout
从风暴开始 BaseRichSpout
产生随机数。然后我申请 MapFunction
它只是每50个元组抽取一个随机数,然后随机地使元组失败。
问题是,我没有得到任何 ack
s或 fail
s。
我玩了一下喷口,在调试模式下运行它,尝试了相同的示例输出,用标准的风暴螺栓进行了尝试。锚定工作正常,只是没有被三叉戟呼叫。
我在1.2.3版和2.0.0版中用localcluster和stormsubmitter重现了这个问题。
下面是storm ui的截图:
与map-ack和tuple对应的螺栓如预期的那样失败,但是这永远不会传播回喷口。
我原以为三叉戟大师会期望某种状态下的持久性来实现拓扑完成,但用某种持久聚合替换peek并没有帮助。我还排除了 map
用同样的方法 each
.
查看代码几乎是微不足道的检查,我可能误解了一些基本的三叉戟/风暴。我以为三叉戟会叫喷口的 ack
方法是否完成了批处理?我意识到没有 fail
中的方法 IBatchSpout
. 三叉戟如何处理批量回放??
1条答案
按热度按时间rwqw0loc1#
三叉戟喷口不会在单个元组级别确认或失败元组。相反,元组是作为一个批进行打包的。
三叉戟喷口通常看起来像这样的接口。
其思想是trident将管理跟踪批处理元组的acks/fails,然后如果批处理失败,它将要求喷口重复该批处理,如果没有,它根本不会。
请注意这与标准风暴喷口有何不同。对于一个普通的喷口,框架基本上告诉喷口“嘿,发射一些东西。由你决定你发射什么“,然后
ack
以及fail
方法用于告诉喷口是否应该再次发出特定的元组。使用trident时,喷口会被告知“嘿,(re)emit batch number x”,然后由喷口知道该批中有哪些元组。有了这种模式,就不需要
fail
方法。一些三叉戟喷口会有一个ack/succeed
方法,以允许喷口下降与特定进行中的批处理相关的任何状态。用于 Package
IRichSpouts
,有一些桥接代码将它们 Package 到trident api中。基本上, Package 器调用nextTuple
直到它有一个完整的批处理,然后它将id存储在缓存中。如果 Package 器被要求重新提交一个批,它将调用fail
在喷口上。否则,它会调用ack
一旦批处理成功。我认为你在storm ui中没有看到与此相关的任何内容的原因是
IRichBolt
实际上并没有在那里表现出来。相反,它是包起来的,所以ack/fail
电话是发生在“引擎盖下”的内部spout-spout
组件。如果您想确定是否正在调用ack/fail,请尝试向ack/fail
你的方法IRichSpout
.