使用scala和Akka在不丢失数据的情况下使用服务器发送事件(SSE)

bsxbgnwa  于 2022-11-05  发布在  Scala
关注(0)|答案(1)|浏览(189)

我想在生产率〉消耗率时消耗SSE事件而不丢失任何数据。由于SSE支持反压力,Akka应该可以做到这一点。我尝试了几种不同的方法,但额外的消息被丢弃。

@Singleton
class SseConsumer @Inject()()(implicit ec: ExecutionContext) {

  implicit val system = ActorSystem()

  val send: HttpRequest => Future[HttpResponse] = foo

  def foo(x: HttpRequest) = {
    try {
      val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
      val newHeaders = x.withHeaders(authHeader)
      Http().singleRequest(newHeaders)
    } catch {
      case e: Exception => {
        println("Exceptio12n", e.printStackTrace())
        throw e
      }
    }
  }

  val eventSource2: Source[ServerSentEvent, NotUsed] =
    EventSource(
      uri = Uri("https://xyz/a/events/user"),
      send,
      initialLastEventId = Some("2"),
      retryDelay = 1.second
    )

  def orderStatusEventStable() = {
    val events: Future[immutable.Seq[ServerSentEvent]] =
      eventSource2
        .throttle(elements = 1, per = 3000.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
        .take(5)
        .runWith(Sink.seq)
    events.map(_.foreach(x => {
      // TODO: push to sqs
      println("456")
      println(x.data)
    }))
  }

  Future {
    blocking {
      while (true) {
        try {
          Await.result(orderStatusEventStable() recover {
            case e: Exception => {
              println("exception", e)
              throw e
            }
          }, Duration.Inf)
        } catch {
          case e: Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }
}

此代码可以正常工作,但存在以下问题:
1.由于.take(5),当消耗率〈生产率时,我正在丢弃事件。
1.我也想处理每一个消息,因为它来了,而不想等到5个消息已经达到。我怎么能做到这一点?
1.我必须在while循环中编写消费者。这似乎不是基于事件的,而是看起来像轮询(非常类似于使用分页和限制5调用GET)
1.我不确定是否要节流,尝试阅读文件,但非常混乱。如果我不想遗失任何事件,节流是正确的方法吗?我希望在高峰时段的速率为5000 req / sec,否则为10 req/sec。当生产率很高时,我最好施加背压。节流是正确的方法吗?根据文档,它似乎是正确的,因为它说Backpressures when downstream backpressures or the incoming rate is higher than the speed limit

56lgkhnf

56lgkhnf1#

为了使Akka Stream回压工作,您必须只使用一个源,而不是每次都用新的源重新创建一种轮询。
忘记你的循环和你的def orderStatusEventStable
仅执行以下操作(一次):

eventSource2
  .operator(event => /* do something */ )
  .otherOperatorMaybe()
  ...
  .runWith(Sink.foreach(println))

operatorotherOperatorMaybe是Akka Stream上的操作,这取决于您想要实现的目标(就像原始代码中的throttletake)。
运算符列表:https://doc.akka.io/docs/akka/current/stream/operators/index.html

  • akka 流很强大,但你需要花点时间去了解它 *

相关问题