我在Apache风暴中面临一个问题
问题场景:
当小数据被发送到storm时,数据会被拓扑正确地处理(只有一个工作线程)并进一步释放以在mongodb中持久化。
但当数据量很大时,它会对数据进行处理并保存在数据库中,但以后无论数据量大小都不会接受任何其他数据。
当前解决方法:
我们从storm ui重新启动worker。
问题:
我们能以编程方式重新启动拓扑工作者吗?
我在Apache风暴中面临一个问题
问题场景:
当小数据被发送到storm时,数据会被拓扑正确地处理(只有一个工作线程)并进一步释放以在mongodb中持久化。
但当数据量很大时,它会对数据进行处理并保存在数据库中,但以后无论数据量大小都不会接受任何其他数据。
当前解决方法:
我们从storm ui重新启动worker。
问题:
我们能以编程方式重新启动拓扑工作者吗?
2条答案
按热度按时间i86rm4rw1#
重启worker从来不是一个好的解决方案,可能会丢失一些元组。最佳实践是利用storm消息可靠性功能,正如rahim所回答的那样。
然而,除了消息可靠性风暴有一个内部背压机制,这意味着当喷口注入的数据超过螺栓能够处理的数据时,喷口将自动减速。
要实现这一点,您首先需要,正如rahim所说,启用acking。这意味着如果您的拓扑结构很简单:
喷口->螺栓
喷口可以:
其中tupleid是一个增量计数器
count++
. 通过这种方式,您可以声明对等待确认的新元组进行风暴处理。同时,在连续螺栓和拓扑中的所有连续螺栓中,或者至少在导致瓶颈的螺栓之前,您将编写:
通过这种方式,您将注意到元组已被完全处理。
至少但不是最后,在声明拓扑生成器的main方法中,必须定义喷口将等待的最大元组数:
通过这种方式,喷口将开始生成等待确认的新元组,如果(在本例中)挂起的元组数超过100,喷口将停止调用nexttuple方法,等待它们被确认,然后生成新元组。
n、 b:值100只是一个例子,您可能需要对它进行一些调整,以使其适合您的情况。
rahim共享的链接应该足以理解该机制,无论如何,如果您想深入到实现,我添加以下链接:
http://storm.apache.org/releases/current/acking-framework-implementation.html
bprjcwpo2#
storm有两种螺栓类型:irichbolt和ibasicbolt。如果您实现了ibasicbolt,那么您也应该实现acknowledge。此外,您应该发送确认在您的螺栓,以防止锁定。这些链接是好的:
http://storm.apache.org/releases/1.0.6/concepts.htmlhttphttp://storm.apache.org/releases/1.2.2/message-processing.html