java—如何在ApacheStorm中以编程方式重新启动拓扑的worker

b09cbbtk  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(349)

我在Apache风暴中面临一个问题
问题场景:
当小数据被发送到storm时,数据会被拓扑正确地处理(只有一个工作线程)并进一步释放以在mongodb中持久化。
但当数据量很大时,它会对数据进行处理并保存在数据库中,但以后无论数据量大小都不会接受任何其他数据。
当前解决方法:
我们从storm ui重新启动worker。
问题:
我们能以编程方式重新启动拓扑工作者吗?

i86rm4rw

i86rm4rw1#

重启worker从来不是一个好的解决方案,可能会丢失一些元组。最佳实践是利用storm消息可靠性功能,正如rahim所回答的那样。
然而,除了消息可靠性风暴有一个内部背压机制,这意味着当喷口注入的数据超过螺栓能够处理的数据时,喷口将自动减速。
要实现这一点,您首先需要,正如rahim所说,启用acking。这意味着如果您的拓扑结构很简单:
喷口->螺栓
喷口可以:

public void nextTuple(){
  ...
  _collector.emit(new Values(tuple), tupleId);
}

@Override
publci void ack(Object msgId) { super.ack(msgId); }

其中tupleid是一个增量计数器 count++ . 通过这种方式,您可以声明对等待确认的新元组进行风暴处理。
同时,在连续螺栓和拓扑中的所有连续螺栓中,或者至少在导致瓶颈的螺栓之前,您将编写:

public void execute(Tuple tuple){
  ...
  _collector.emit(tuple, new Values(newTuple));
  _collector.ack(tuple);
}

通过这种方式,您将注意到元组已被完全处理。
至少但不是最后,在声明拓扑生成器的main方法中,必须定义喷口将等待的最大元组数:

Config conf = new Config();
conf.setMaxSpoutPending(100);

通过这种方式,喷口将开始生成等待确认的新元组,如果(在本例中)挂起的元组数超过100,喷口将停止调用nexttuple方法,等待它们被确认,然后生成新元组。
n、 b:值100只是一个例子,您可能需要对它进行一些调整,以使其适合您的情况。
rahim共享的链接应该足以理解该机制,无论如何,如果您想深入到实现,我添加以下链接:
http://storm.apache.org/releases/current/acking-framework-implementation.html

bprjcwpo

bprjcwpo2#

storm有两种螺栓类型:irichbolt和ibasicbolt。如果您实现了ibasicbolt,那么您也应该实现acknowledge。此外,您应该发送确认在您的螺栓,以防止锁定。这些链接是好的:
http://storm.apache.org/releases/1.0.6/concepts.htmlhttphttp://storm.apache.org/releases/1.2.2/message-processing.html

相关问题