以下是问题的背景:
1.有一个源头,它不断地滴答作响,没有关于滴答频率的保证
1.我们希望限制源的最大滴答速率(例如,我们在DB中启动消息,并且我们不希望存储频率超过每7秒)
1.我们只对最新的事件感兴趣,所以如果在5秒的等待时间内有新的东西发出,我们只对它感兴趣。
这是我能想到的最好的:
Source.tick(5.seconds, 3.seconds, 1)
.scan(0)((a, b) => a + b) // have a counter
.wireTap(num => logger.warn(s"up ${num.formatted("%02d")}"))
.buffer(1, OverflowStrategy.dropHead)
.throttle(1, 7.seconds)
.wireTap(num => logger.warn(s"down ${num.formatted("%02d")}"))
.runWith(Sink.ignore)(materializer)
这几乎像我希望的那样工作:有一个节流阀,不会让超过一个项目每7秒,有一个缓冲区之前,节流阀将保留一个单一的元素,并取代它与一个新的到来。
然而,当我检查日志时,我可以看到行为是次优的:
up 01
down 01
up 02
up 03
down 02
up 04
up 05
up 06
down 03
up 07
up 08
down 06
up 09
up 10
down 08
throttle并不从缓冲区中获取最新的元素,而是使用最后一个被throttle的元素被释放时缓冲区中的元素。也就是说,它看起来不是暂停然后检查一个新元素,而是throttle接受一个元素并等待它,直到计时器完成。
有没有更好的方法来做到这一点?或者我应该实现自己的流程?
2条答案
按热度按时间bn31dyow1#
在
GraphStage
中实现您自己的,您可以精确控制何时以及如何推/拉元素。这里有一个例子
在流动中运行
产生
l7wslrjt2#
要获取时间窗口内n个元素中的最后一个元素,您可以使用
groupedWithin
运算符,然后使用map
,例如: