不完全消耗Akka流的问题

w8f9ii69  于 2023-01-13  发布在  其他
关注(0)|答案(1)|浏览(191)

我们有一个服务使用Alpakka 3.0.4(Scala 2.13)从S3流传输多个大文件,压缩它们,并将压缩后的流作为HTTP响应发送出去,其思想是在压缩完成之前就开始发送,在文件到达之前就开始压缩,等等--所有这些都在背压方面进行了精心管理,这正是Akka Streams擅长的。
顺便说一下,HTTP服务器是Aleph,因为代码库实际上是Clojure,我们从Clojure调用Scala(这根本不是问题--事实上,比从Clojure调用Java容易得多)。Aleph支持以块为单位流式传输HTTP响应,这实际上是它的默认模式。
如果HTTP响应被完全消耗,一切都正常。响应的主体是压缩的输入流。一旦客户端完全消耗了它,流就被关闭,Akka会关闭所有其他流,包括原始的S3输入流。
但偶尔,客户端会在压缩流完全消耗完之前过早关闭连接(仍在调查原因),最终用户只会得到一个空的zip文件。我们在服务器端观察到S3流没有正确关闭,这导致连接泄漏,直到S3连接池耗尽。
处理这种不完全消费的正确方法是什么?
查看InputStreamSource类的源代码时,我注意到以下几点:

override def postStop(): Unit = {
        if (!isClosed) {
          mat.tryFailure(new AbruptStageTerminationException(this))
        }
      }

      private def failStream(reason: Throwable): Unit = {
        closeInputStream()
        mat.tryFailure(new IOOperationIncompleteException(readBytesTotal, reason))
      }

failStream中的流被关闭时,有什么特殊的原因不关闭postStop中的流吗?如果我们在一个图上调用runWith,但得到的输入流没有被完全使用,它将在什么时候被调用?或者我应该查看其他内容吗?
更新,回应评论:
这个图非常简单,原始源代码由一系列元组(filename和Source,由输入流构造)构成,这些元组流经Archive.zip并进入输入流接收器。Clojure代码大致如下:

(let [tuples (reify Iterable
                  (iterator [_]
                    ;; Some wrapper code that ultimately calls this for each item:
                    (Tuple2/apply (ArchiveMetadata/create filename)
                                  (StreamConverters/fromInputStream
                                    (reify Function0 (apply [_] attachment))))))
         source (Source/apply tuples)
         graph  (.via source (Archive/zip))
         sink   (StreamConverters/asInputStream
                  (FiniteDuration/apply ^long timeout TimeUnit/SECONDS))
         mat    (Materializer/matFromSystem system)]
     (.runWith graph sink mat))))

Scala的等价物如下所示:

val tuples = [some-iterable].map {
  case Something(filename, attachment) =>
    (ArchiveMetadata.create(filename),
     StreamConverters.fromInputStream(attachment)
}

val source = Source(tuples)
val graph = source.via(Archive.zip())
val sink = StreamConverters.asInputStream(timeout seconds)

// Asssuming an implicit materializer from an ActorSystem
graph.runWith(sink) // returns an InputStream

输入流(attachment,见上文)通常是S3流,但也可以来自其他存储库。它们不是通过调用Akka获得的。对于S3,它们是通过调用amazonica获得的,amazonica是Java AWS API的一个瘦Clojure Package 器。
更新2:为了重现此问题,我们使用了--headcurl请求。如果使用wget而不是curl,则会完全下载压缩文件,并且不会发现此问题(所有的流都被关闭)。如果我们通过请求头部而停止,压缩流(.runWith返回的流)被关闭,但原始S3流(上面代码中的attachment)未被关闭。
如果在执行.runWith之前放置了断点,那么它也是可重现的。因此,可能涉及到争用条件。在服务器端不会抛出异常。

6ss1mwsb

6ss1mwsb1#

因此,Akka接收器没有正确地保护压缩的输入流,以免在源文件完全压缩之前耗尽。如果在结果InputStream上调用read,并且它还没有数据,则整个阶段关闭
输入流是akka.stream.impl.io.InputStreamAdapter的一个示例(它实现java.io.InputStream),在read的实现中包含以下代码:

sharedBuffer.poll(readTimeout.toMillis, TimeUnit.MILLISECONDS) match {
                  case Data(data) =>
                    detachedChunk = Some(data)
                    readBytes(a, begin, length)
                  case Finished =>
// THIS IS WHART HAPPENS!!!
                    isStageAlive = false
                    -1
                  case Failed(ex) =>
                    isStageAlive = false
                    throw new IOException(ex)
                  case null        => throw new IOException("Timeout on waiting for new data")
                  case Initialized => throw new IllegalStateException("message 'Initialized' must come first")
                }

我的印象是Akka流既可以防止背压(当生产者比消费者快时),也可以防止早期耗尽(当消费者比生产者快时),但显然不是!
或者我使用了错误的接收器?我需要一个特殊的接收器来处理HTTP响应吗?不是Sink[_, InputStream],而是Sink[_, Future[HttpResponse]]或者Sink[_, manifold.deferred.IDeferred](因为我们使用Aleph而不是Akka Http?不过,如果使用了合适的接收器,我可以很容易地从Future转换为IDeferred
有什么建议吗?

相关问题