### TOPOLOGY ###
class WordCountTopology < DSL::Topology
spout RandomSentenceSpout, :parallelism => 2
bolt SplitSentenceBolt, :parallelism => 2 do
source RandomSentenceSpout, :shuffle
end
bolt WordCountBolt, :parallelism => 2 do
source SplitSentenceBolt, :fields => ["word"]
end
configure :word_count do |env|
debug true
max_task_parallelism 4
if env == :cluster
num_workers 6
max_spout_pending(1000)
end
end
on_submit do |env|
if env == :local
sleep(60)
cluster.shutdown
end
end
end
### SPOUT ###
class RandomSentenceSpout < DSL::Spout
output_fields :word
on_send {@sentences[rand(@sentences.length)]}
sleep(10)
on_init do
@sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
end
end
#### SplitSentenceBolt ###
class SplitSentenceBolt < DSL::Bolt
output_fields :word
on_receive {|tuple| tuple[0].split(' ').map{|w| [w]}}
end
### WORDCOUNTBOLT ###
class WordCountBolt < DSL::Bolt
output_fields :word, :count
on_init {@counts = Hash.new{|h, k| h[k] = 0}}
on_receive do |tuple|
word = tuple[0].to_s
@counts[word] += 1
[word, @counts[word]]
end
end
我想实现一个tick元组。我想每60秒做一次字数统计,输出结果,将计数器重置为0。
//可能是
//分句式插销
函数以每60秒发送一次tick\u tuple
//字数统计螺栓
if(勾选元组){
发出结果
@计数=[]#重新初始化
}
有人能帮我实现吗?我是风暴世界的新手。
2条答案
按热度按时间vwoqyblh1#
我不认识鲁比,但希望如此
python
会帮你弄清楚的。这就是我们如何在我们的螺栓:eit6fx6z2#
您可以在功能上使用storm internal tick tuple来设置一个bolt以指定的间隔接收tick tuple。
在拓扑定义的bolt部分中,只需添加以下配置:
在bolt中,您可以像这样测试tick tuple: