将一个状态完整的apache storm bolt插入到拓扑中,假设如下所示:
public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
private KeyValueState<String, Long> wordCounts;
private OutputCollector collector;
...
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void initState(KeyValueState<String, Long> state) {
wordCounts = state;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = wordCounts.get(word, 0);
count++;
wordCounts.put(word, count);
collector.emit(tuple, new Values(word, count));
collector.ack(tuple);
}
...
}
我需要触发一个方法来更新状态(wordcounts),比如说每x秒,独立于是否接收到事件。这在Apache风暴中是可能的吗?有没有可能简单地安排这样一个方法在定义的时间间隔内重复运行?
public void updateState() {
wordCounts.put("NewKey", 1);
}
1条答案
按热度按时间a5g8bdjr1#
我不知道为什么需要定期更新状态,但是如果需要经常运行一些代码,tick tuples可能适合您。https://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/