我想知道在apachestorm中处理以下问题的最佳实践是什么。
我有一个喷口,它生成一个带有显式时间戳的整数值流。目标是在此流上使用三个滑动窗口执行最小/最大聚合:
最后一小时
最后一天,即最后24小时
最后一小时很简单:
topology.setBolt("1h", ...)
.shuffleGrouping("spout")
.withWindow(Duration.hours(1), Duration.seconds(10))
.withTimestampField("timestamp"));
但是,在较长的时间内,我关心的是窗口的队列大小。当我从喷口直接消费元组时,就像最后一个小时的聚合一样,每一个元组都会在队列中结束。
一种可能是使用预先聚合的“1h”bolt中的元组。但是,由于我使用的是显式时间戳,因此忽略了从“1h”螺栓到达的延迟元组。不能选择1小时的延迟,因为这会延迟窗口的评估。有没有一种方法可以“允许”延迟元组而不影响结果的及时性?
当然,我也可以每小时存储一个聚合,然后计算过去24小时的最小值,包括“1h”流中的最新值。但我很好奇,是否有一种方法可以做到这一点正确使用风暴手段。
更新1
多亏了arunmahadevan的回答,我修改了1hmin的bolt,以在相应的1h窗口中发出具有所有元组的最大时间戳的最小元组。这样,消耗螺栓就不会因为延迟到达而丢弃元组。我还介绍了一个新领域 original-timestamp
保留最小元组的原始时间戳。
更新2
我终于找到了一个更好的方法,只在1hmin螺栓中发射状态变化。只要没有收到新的元组,storm就不会提前消耗bolt中的时间,因此可以防止延迟到达问题。而且,我可以保留原始的时间戳,而不将其复制到单独的字段中。
1条答案
按热度按时间nuypyhwy1#
我认为周期性地将min从“1h”发送到“24h”螺栓应该起作用,并控制“24h”队列大小。
如果配置滞后,则只有在该滞后之后(即,当事件时间跨越滑动间隔+滞后时)才会调用bolt的execute。
假设将“1h”bolt配置为1分钟的延迟,那么只有在事件时间经过02:01之后,才会对01:00到02:00之间的元组调用execute(i、 e.螺栓已看到时间戳>=02:01)的事件。但是,execute将只接收01:00到02:00之间的元组。
现在,如果您计算最后一个小时的最小值,并将结果发送到一个滑动间隔为1小时且滞后=0的“24小时”螺栓,则一旦传入事件的时间戳穿过下一个小时,它就会触发。如果发出时间戳为02:00的01:00-02:00 min,“24小时”窗口将在收到min事件后立即触发(对于前一天02:00到02:00之间的事件),因为事件时间跨越了下一个小时,并且配置的延迟为0。