我想实现一个Flow来处理分页的结果(例如,底层服务返回一些结果,但也通过发出另一个请求来指示更多的结果,例如传递一个游标)。
到目前为止我做的事情:
1.我已经实现了下面的流程和测试,但是流程没有完成。
object AdditionalRequestsFlow {
private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = builder.add(Flow[Request])
val bcast = builder.add(Broadcast[Request](2))
val merge = builder.add(Zip[Request, Response]())
in ~> bcast ~> merge.in0
bcast ~> flow ~> merge.in1
FlowShape(in.in, merge.out)
})
}
def flow[Request, Response, Output](
inputFlow: Flow[Request, Response, NotUsed],
anotherRequest: (Request, Response) => Option[Request],
extractOutput: Response => Output,
mergeOutput: (Output, Output) => Output
): Flow[Request, Output, NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val start = b.add(Flow[Request])
val merge = b.add(Merge[Request](2))
val underlying = b.add(keepRequest(inputFlow))
val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList))
val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) =>
(res, anotherRequest(req, res))
})
val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work
start ~> merge ~> underlying ~> unzip.in
unzip.out0 ~> finish
merge <~ unOption <~ unzip.out1
FlowShape(start.in, finish.out)
})
}
}
字符串
测试:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import cats.syntax.option._
import org.scalatest.concurrent.ScalaFutures.whenReady
class AdditionalRequestsFlowSpec extends FlatSpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
case class Request(max: Int, batchSize: Int, offset: Option[Int] = None)
case class Response(values: List[Int], nextOffset: Option[Int])
private val flow: Flow[Request, Response, NotUsed] = {
Flow[Request]
.map { request =>
val start = request.offset.getOrElse(0)
val end = Math.min(request.max, start + request.batchSize)
val nextOffset = if (end == request.max) None else Some(end)
val result = Response((start until end).toList, nextOffset)
result
}
}
"AdditionalRequestsFlow" should "collect additional responses" in {
def anotherRequest(request: Request, response: Response): Option[Request] = {
response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) }
}
def extract(x: Response): List[Int] = x.values
def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b
val requests =
Request(max = 35, batchSize = 10) ::
Request(max = 5, batchSize = 10) ::
Request(max = 100, batchSize = 1) ::
Nil
val expected = requests.map { x =>
(0 until x.max).toList
}
val future = Source(requests)
.via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge))
.runWith(Sink.seq)
whenReady(future) { x =>
x shouldEqual expected
}
}
}
型
1.以一种可怕的、阻塞的方式实现了相同的流,以说明我试图实现的目标:
def uglyHackFlow[Request, Response, Output](
inputFlow: Flow[Request, Response, NotUsed],
anotherRequest: (Request, Response) => Option[Request],
extractOutput: Response => Output,
mergeOutput: (Output, Output) => Output
): Flow[Request, Output, NotUsed] = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
Flow[Request]
.map { x =>
def grab(request: Request): Output = {
val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :(
val another = anotherRequest(request, response)
val output = extractOutput(response)
another.map { another =>
mergeOutput(output, grab(another))
} getOrElse output
}
grab(x)
}
}
型
这是可行的(但我们不应该在这一点上实现任何/Await
-ing)。
1.查看了http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks,我相信它包含了答案,但我似乎找不到它。在我的情况下,我希望循环在大多数时候应该包含一个元素,所以缓冲区溢出或完全饥饿都不应该发生-但显然确实如此。
1.尝试使用.withAttributes(Attributes(LogLevels(...)))
调试流,但它不会导致任何输出,尽管似乎正确配置了记录器。
我正在寻找如何修复flow
方法的提示,以保持相同的签名和语义(测试将通过)。
或者我在这里做了一些完全不合适的事情(例如,在akka-stream-contrib
中有一个现有的特性可以解决这个问题)?
3条答案
按热度按时间irtuqstp1#
我认为使用
Source.unfold
比创建自定义图要安全得多。字符串
在你的情况下,你可能甚至不需要把光标推到流上,我这样做是因为我把最后一个成功的光标存储在数据库中,以便在失败的情况下恢复。
xfb7svmp2#
感觉这个video涵盖了你要做的事情的要点。他们创建了一个自定义的Graphstage来维护状态并将其发送回服务器,响应流取决于发送回的状态,他们还有一个事件来表示完成(在你的情况下,它会是你检查的地方
第一个月
htzpubme3#
可以使用
Source.unfoldAsync
。我准备了一个简单的项目,包括向REST API发送请求,从响应中检索数据和next_page_url,并利用next_page_url获取下一个数据块。这个过程会迭代,直到next_page_url为None。在每一步,流都会累积数据,最终将所有收集的数据组合到单个Seq中。
完整的源代码可以在over on GitHub中找到
字符串