我有一个akka http
API,用户将S3 URL发送到服务器。服务器然后从AWS服务器启动流,并对源执行未来的操作。然而,我想在执行任何操作之前验证传入stream
的大小。但我们不能使用withSizeLimit
akka-http
指令,所以我为此创建了一个自定义实现。
final case class SizeLimit(maxBytes: Long, contentLength: Option[Long] = None) extends Attributes.Attribute {
def isDisabled = maxBytes < 0
}
object Limitable {
def applyForByteStrings[Mat](source: Source[ByteString, Mat], limit: SizeLimit): Source[ByteString, Mat] =
applyLimit(source, limit)(_.size)
def applyForChunks[Mat](source: Source[ChunkStreamPart, Mat], limit: SizeLimit): Source[ChunkStreamPart, Mat] =
applyLimit(source, limit)(_.data.size)
def applyLimit[T, Mat](source: Source[T, Mat], limit: SizeLimit)(sizeOf: T => Int): Source[T, Mat] =
if (limit.isDisabled) source withAttributes Attributes(limit) // no need to add stage, it's either there or not needed
else source.via(new Limitable(sizeOf)) withAttributes Attributes(limit)
private val limitableDefaults = Attributes.name("limitable")
}
final class Limitable[T](sizeOf: T => Int) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("Limitable.in")
val out = Outlet[T]("Limitable.out")
var numPullCalls = 0
var numPushCalls = 0
override val shape = FlowShape.of(in, out)
override protected val initialAttributes: Attributes = Limitable.limitableDefaults
override def createLogic(_attributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var maxBytes = -1L
private var bytesLeft = Long.MaxValue
@nowarn("msg=deprecated") // we need getFirst semantics
override def preStart(): Unit = {
_attributes.getFirst[SizeLimit] match {
case Some(limit: SizeLimit) if limit.isDisabled =>
// "no limit"
case Some(SizeLimit(bytes, cl @ Some(contentLength))) =>
if (contentLength > bytes) failStage(EntityStreamSizeException(bytes, cl))
// else we still count but never throw an error
case Some(SizeLimit(bytes, None)) =>
maxBytes = bytes
bytesLeft = bytes
case None =>
}
}
override def onPush(): Unit = {
numPushCalls += 1
println(s"Push calls $numPushCalls")
val elem = grab(in)
val temp = sizeOf(elem)
println(s"Elem size is $temp")
bytesLeft -= temp
if (bytesLeft >= 0) {
push(out, elem)
}
else {
println(s"EntityStreamSizeException Bytes left $bytesLeft")
failStage(EntityStreamSizeException(maxBytes))
}
}
override def onPull(): Unit = {
numPullCalls += 1
println(s"Pull calls $numPullCalls")
pull(in)
}
setHandlers(in, out, this)
}
}
val filePath = Paths.get("/Users/<username>/Documents/bigfile.pdf")
val fileSource: Source[ByteString, Any] = FileIO.fromPath(filePath)
val res = Limitable.applyForByteStrings(fileSource, SizeLimit(4000000L))
val sink = StreamConverters.asInputStream()
val result = res.runWith(sink)
val tis = TikaInputStream.get(result)
字符串
这是自定义实现参考:https://github.com/akka/akka-http/blob/main/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala
自定义实现后,如果文件大小超过4 Mb,我希望服务器抛出EntityStreamSizeException
,但它不会抛出任何异常。
1条答案
按热度按时间cl25kdpy1#
该文件说
字符串
我没有在任何地方消费输出流,因此仅限于处理16个元素,这是默认的
Sink
大小。型