限制Akka流中的Source滴答频率,并始终对最新消息采取行动

1u4esq0p  于 2023-10-18  发布在  其他
关注(0)|答案(2)|浏览(191)

以下是问题的背景:
1.有一个源头,它不断地滴答作响,没有关于滴答频率的保证
1.我们希望限制源的最大滴答速率(例如,我们在DB中启动消息,并且我们不希望存储频率超过每7秒)
1.我们只对最新的事件感兴趣,所以如果在5秒的等待时间内有新的东西发出,我们只对它感兴趣。
这是我能想到的最好的:

  1. Source.tick(5.seconds, 3.seconds, 1)
  2. .scan(0)((a, b) => a + b) // have a counter
  3. .wireTap(num => logger.warn(s"up ${num.formatted("%02d")}"))
  4. .buffer(1, OverflowStrategy.dropHead)
  5. .throttle(1, 7.seconds)
  6. .wireTap(num => logger.warn(s"down ${num.formatted("%02d")}"))
  7. .runWith(Sink.ignore)(materializer)

这几乎像我希望的那样工作:有一个节流阀,不会让超过一个项目每7秒,有一个缓冲区之前,节流阀将保留一个单一的元素,并取代它与一个新的到来。
然而,当我检查日志时,我可以看到行为是次优的:

  1. up 01
  2. down 01
  3. up 02
  4. up 03
  5. down 02
  6. up 04
  7. up 05
  8. up 06
  9. down 03
  10. up 07
  11. up 08
  12. down 06
  13. up 09
  14. up 10
  15. down 08

throttle并不从缓冲区中获取最新的元素,而是使用最后一个被throttle的元素被释放时缓冲区中的元素。也就是说,它看起来不是暂停然后检查一个新元素,而是throttle接受一个元素并等待它,直到计时器完成。
有没有更好的方法来做到这一点?或者我应该实现自己的流程?

bn31dyow

bn31dyow1#

GraphStage中实现您自己的,您可以精确控制何时以及如何推/拉元素。
这里有一个例子

  1. class LastElementWithin[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, A]] {
  2. private val in = Inlet[A]("LastElementWithin.in")
  3. private val out = Outlet[A]("LastElementWithin.out")
  4. override val shape: FlowShape[A, A] = FlowShape(in, out)
  5. private sealed trait CallbackEvent
  6. private case object Pull extends CallbackEvent
  7. private case object Push extends CallbackEvent
  8. private case object Flush extends CallbackEvent
  9. private case object Finish extends CallbackEvent
  10. override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
  11. new TimerGraphStageLogic(shape) with StageLogging {
  12. setHandlers(
  13. in = in,
  14. out = out,
  15. handler = new AbstractInOutHandler {
  16. override def onPush(): Unit = asyncCallback.invoke(Push)
  17. override def onPull(): Unit = asyncCallback.invoke(Pull)
  18. override def onUpstreamFinish(): Unit = asyncCallback.invoke(Finish)
  19. }
  20. )
  21. private val FlushTimerKey = "Flush"
  22. protected override def onTimer(timerKey: Any): Unit = {
  23. if (timerKey == FlushTimerKey) {
  24. asyncCallback.invoke(Flush)
  25. }
  26. }
  27. private val asyncCallback = createAsyncCallback(new Procedure[CallbackEvent] {
  28. private var last: Option[A] = None
  29. override def apply(param: CallbackEvent): Unit = {
  30. param match {
  31. case Pull => onPull()
  32. case Push => onPush()
  33. case Finish => onFinish()
  34. case Flush => flush()
  35. }
  36. }
  37. private def onPull(): Unit = {
  38. if (!isTimerActive(FlushTimerKey)) scheduleOnce(FlushTimerKey, duration)
  39. if (!hasBeenPulled(in)) pull(in)
  40. }
  41. private def onPush(): Unit = {
  42. last = Some(grab(in))
  43. pull(in)
  44. }
  45. private def onFinish(): Unit = {
  46. cancelTimer(FlushTimerKey)
  47. last.foreach(emit(out, _))
  48. completeStage()
  49. }
  50. private def flush(): Unit = {
  51. if (isAvailable(out)) {
  52. last.foreach(emit(out, _))
  53. scheduleOnce(FlushTimerKey, duration)
  54. }
  55. }
  56. })
  57. }
  58. }

在流动中运行

  1. implicit val as: ActorSystem = ActorSystem("test")
  2. val done = Source
  3. .tick(5.nanoseconds, 3.seconds, 1)
  4. .scan(0)((a, b) => a + b)
  5. .wireTap(num => println(s"up ${"%02d".format(num)}"))
  6. .via(Flow.fromGraph(new LastElementWithin(7.seconds)))
  7. .wireTap(num => println(s"down ${"%02d".format(num)}"))
  8. .toMat(Sink.ignore)(Keep.right)
  9. .run()

产生

  1. up 00
  2. up 01
  3. up 02
  4. up 03
  5. down 03
  6. up 04
  7. up 05
  8. down 05
  9. up 06
  10. up 07
  11. down 07
  12. up 08
  13. up 09
  14. up 10
  15. down 10
展开查看全部
l7wslrjt

l7wslrjt2#

要获取时间窗口内n个元素中的最后一个元素,您可以使用groupedWithin运算符,然后使用map,例如:

  1. Source
  2. .tick(5.seconds, 1.second, 1)
  3. .scan(0)((a, b) => a + b)
  4. .wireTap(num => println(s"up ${"%02d".format(num)}"))
  5. .groupedWithin(Int.MaxValue, 7.seconds)
  6. .map(_.last)
  7. .wireTap(num => println(s"down ${"%02d".format(num)}"))
  8. .runWith(Sink.ignore)

相关问题