如何在apachestorm中存储临时数据?
在storm拓扑中,bolt需要访问先前处理的数据。
Eg: if the bolt processes varaiable1 with result as 20 at 10:00 AM.
一次又一次 varaiable1
作为接收 50
在 10:15 AM
那么结果应该是 30 (50-20)
如果varaiable1收到 70
那么结果应该是 20 (70-50)
在 10:30
.
如何实现这一功能。
如何在apachestorm中存储临时数据?
在storm拓扑中,bolt需要访问先前处理的数据。
Eg: if the bolt processes varaiable1 with result as 20 at 10:00 AM.
一次又一次 varaiable1
作为接收 50
在 10:15 AM
那么结果应该是 30 (50-20)
如果varaiable1收到 70
那么结果应该是 20 (70-50)
在 10:30
.
如何实现这一功能。
5条答案
按热度按时间6pp0gazn1#
有几种方法可以做到这一点,但这取决于您的系统需求、团队技能和基础设施。
您可以使用apachecassandra存储事件,并在元组中传递行的键,以便下一个bolt可以检索它。
如果您的数据本质上是时间序列,那么您可能想看看opentsdb或xdb。
当然,您可以退回到软件事务内存之类的东西,但我认为这需要大量的手工制作。
sy5wg1nm2#
恐怕到今天为止还没有这样的内置功能。但是您可以使用任何类型的分布式缓存,比如memcached或redis。这些缓存解决方案非常容易使用。
v9tzhpje3#
uou可以使用cachebuilder记住扩展baserichbolt中的数据(将其放入prepare方法中):
然后在execute中,您可以使用缓存查看是否已经看到该键条目。您可以从中添加业务逻辑:
ldxq2e6h4#
这个问题很适合演示apachespark在微批处理上的内存计算。然而,在storm中实现用例是微不足道的。
1) 确保螺栓使用字段分组。它将始终如一地将传入的元组散列到同一个螺栓上,这样我们就不会丢失任何元组。
2) 在博尔特的本地缓存中维护一个Map。此Map将保留“变量”的最后一个已知值。
vs91vp4v5#
简言之,您希望使用in storm的运行元组进行微批处理计算。首先需要在元组集中定义/查找键。使用该键在螺栓之间进行字段分组(不要使用随机分组)。这将保证相关元组始终发送到同一密钥的下游螺栓的相同任务。定义类级集合列表/Map来维护旧值并在同一个集合中添加新值以进行计算,不用担心它们在同一螺栓的不同执行器示例之间是线程安全的。