如何在风暴中创建翻滚窗口有两个阈值。例如,如果我将windowcount设置为500,windowduration设置为5秒,那么即使消息少于500条,但已经过了5秒,也应该处理窗口。我可以看到这两种功能的独立api
计数
.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
为了时间
.tumblingWindow(Duration.seconds(5), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
我能要两者的组合吗?
如果我按messagecount而不是duration进行配置,那么当我停止拓扑时,我的消息会发生什么变化?即使没有收到批处理计数,storm也会处理这些消息吗?或者我会失去这些信息?
1条答案
按热度按时间4zcjmb1e1#
我不相信你能用当前的窗口api做到这一点。
代码是可插拔的,足以允许它在内部运行,但是您需要的api没有公开。有两个接口来定义如何处理窗口。
triggerpolicy决定何时将窗口传递到bolt(例如,“当缓冲了100个元组时传递”)。
executionpolicy决定何时从当前窗口中逐出元组(例如,“一旦元组比窗口中最新的元组落后500个元组,则丢弃元组”)。
您可以通过basewindowedbolt.withwindowlength等间接配置这些策略,它只在内部设置一些配置属性。这些属性用于确定windowedboltexecutor中的触发器/逐出策略。
我认为需要的是允许用户提供他们自己的定制triggerpolicy/executionpolicy,或者添加一个新的triggerpolicy/executionpolicy来做你想做的事情。
如果你想为此提出问题,可以在https://issues.apache.org/jira/projects/storm. 如果您想贡献代码,源代码可以在https://github.com/apache/storm,你也可以在那里提高公关。