我想在生产率〉消耗率时消耗SSE事件而不丢失任何数据。由于SSE支持反压力,Akka应该可以做到这一点。我尝试了几种不同的方法,但额外的消息被丢弃。
@Singleton
class SseConsumer @Inject()()(implicit ec: ExecutionContext) {
implicit val system = ActorSystem()
val send: HttpRequest => Future[HttpResponse] = foo
def foo(x: HttpRequest) = {
try {
val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
val newHeaders = x.withHeaders(authHeader)
Http().singleRequest(newHeaders)
} catch {
case e: Exception => {
println("Exceptio12n", e.printStackTrace())
throw e
}
}
}
val eventSource2: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri("https://xyz/a/events/user"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second
)
def orderStatusEventStable() = {
val events: Future[immutable.Seq[ServerSentEvent]] =
eventSource2
.throttle(elements = 1, per = 3000.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(5)
.runWith(Sink.seq)
events.map(_.foreach(x => {
// TODO: push to sqs
println("456")
println(x.data)
}))
}
Future {
blocking {
while (true) {
try {
Await.result(orderStatusEventStable() recover {
case e: Exception => {
println("exception", e)
throw e
}
}, Duration.Inf)
} catch {
case e: Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
}
此代码可以正常工作,但存在以下问题:
1.由于.take(5)
,当消耗率〈生产率时,我正在丢弃事件。
1.我也想处理每一个消息,因为它来了,而不想等到5个消息已经达到。我怎么能做到这一点?
1.我必须在while循环中编写消费者。这似乎不是基于事件的,而是看起来像轮询(非常类似于使用分页和限制5调用GET)
1.我不确定是否要节流,尝试阅读文件,但非常混乱。如果我不想遗失任何事件,节流是正确的方法吗?我希望在高峰时段的速率为5000 req / sec,否则为10 req/sec。当生产率很高时,我最好施加背压。节流是正确的方法吗?根据文档,它似乎是正确的,因为它说Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
1条答案
按热度按时间56lgkhnf1#
为了使Akka Stream回压工作,您必须只使用一个源,而不是每次都用新的源重新创建一种轮询。
忘记你的循环和你的
def orderStatusEventStable
。仅执行以下操作(一次):
operator
和otherOperatorMaybe
是Akka Stream上的操作,这取决于您想要实现的目标(就像原始代码中的throttle
和take
)。运算符列表:https://doc.akka.io/docs/akka/current/stream/operators/index.html