storm集群重复元组

p1iqtdky  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(366)

目前我正在进行一个项目,在这个项目中,我在四台unix主机上设置了一个storm集群。
拓扑结构本身如下所示:
jms-spout侦听mq以获取新消息
jms spout解析并将结果发送到esper bolt
esperbolt然后处理事件并向jms bolt发送结果
然后,jms bolt将消息发布回另一个主题上的mq
我意识到storm是一个“至少一次”的框架。但是,如果我接收到5个事件并将它们传递给esperbolt进行计数,那么出于某种原因,我将在jms bolt中接收到5个计数结果(都是相同的值)。
理想情况下,我想接收一个结果输出,有没有什么方法可以告诉storm忽略重复元组?
我认为这与我设置的并行性有关,因为当我只有一个线程时,它会按预期工作:

TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

我也见过三叉戟的“恰好一次”语义。然而,我并不完全相信这能解决这个问题。

q5lcpyga

q5lcpyga1#

如果您的esperbolt没有在execute()方法的末尾显式地ack()每个元组,或者没有使用ibasicbolt实现,那么它接收到的每个元组最终都将在超时后由您的源jms喷口重放。
或者,如果您要求bolt“仅处理唯一消息”,请考虑将此处理行为添加到execute()方法中。它可以首先检查本地guava缓存的元组值唯一性,然后进行相应的处理。

相关问题