storm中的分布式缓存

vltsax25  于 2021-06-21  发布在  Storm
关注(0)|答案(5)|浏览(419)

如何在apachestorm中存储临时数据?
在storm拓扑中,bolt需要访问先前处理的数据。

Eg: if the bolt processes varaiable1 with result as 20 at 10:00 AM.

一次又一次 varaiable1 作为接收 5010:15 AM 那么结果应该是 30 (50-20) 如果varaiable1收到 70 那么结果应该是 20 (70-50)10:30 .
如何实现这一功能。

6pp0gazn

6pp0gazn1#

有几种方法可以做到这一点,但这取决于您的系统需求、团队技能和基础设施。
您可以使用apachecassandra存储事件,并在元组中传递行的键,以便下一个bolt可以检索它。
如果您的数据本质上是时间序列,那么您可能想看看opentsdb或xdb。
当然,您可以退回到软件事务内存之类的东西,但我认为这需要大量的手工制作。

sy5wg1nm

sy5wg1nm2#

恐怕到今天为止还没有这样的内置功能。但是您可以使用任何类型的分布式缓存,比如memcached或redis。这些缓存解决方案非常容易使用。

v9tzhpje

v9tzhpje3#

uou可以使用cachebuilder记住扩展baserichbolt中的数据(将其放入prepare方法中):

// init your cache.
this.cache = CacheBuilder.newBuilder()
                         .maximumSize(maximumCacheSize)
                         .expireAfterWrite(expireAfterWrite, TimeUnit.SECONDS)
                         .build();

然后在execute中,您可以使用缓存查看是否已经看到该键条目。您可以从中添加业务逻辑:

// if we haven't seen it before, we can emit it.
if(this.cache.getIfPresent(key) == null) {
    cache.put(key, nearlyEmptyList);
    this.collector.emit(input, input.getValues());
}

this.collector.ack(input);
ldxq2e6h

ldxq2e6h4#

这个问题很适合演示apachespark在微批处理上的内存计算。然而,在storm中实现用例是微不足道的。
1) 确保螺栓使用字段分组。它将始终如一地将传入的元组散列到同一个螺栓上,这样我们就不会丢失任何元组。
2) 在博尔特的本地缓存中维护一个Map。此Map将保留“变量”的最后一个已知值。

class CumulativeDiffBolt extends InstrumentedBolt{

Map<String, Integer> lastKnownVariableValue;

@Override
public void prepare(){
     this.lastKnownVariableValue = new HashMap<>();
     ....

@Override
public void instrumentedNextTuple(Tuple tuple, Collector collector){
     .... extract variable from tuple
     .... extract current value from tuple
     Integer lastValue = lastKnownVariableValue.getOrDefault(variable, 0)
     Integer newValue = currValue - lastValue

     lastKnownVariableValue.put(variable, newValue)
     emit(new Fields(variable, newValue));
   ...
}
vs91vp4v

vs91vp4v5#

简言之,您希望使用in storm的运行元组进行微批处理计算。首先需要在元组集中定义/查找键。使用该键在螺栓之间进行字段分组(不要使用随机分组)。这将保证相关元组始终发送到同一密钥的下游螺栓的相同任务。定义类级集合列表/Map来维护旧值并在同一个集合中添加新值以进行计算,不用担心它们在同一螺栓的不同执行器示例之间是线程安全的。

相关问题