处理分页结果的Akka流未完成

nfs0ujit  于 2024-01-08  发布在  其他
关注(0)|答案(3)|浏览(239)

我想实现一个Flow来处理分页的结果(例如,底层服务返回一些结果,但也通过发出另一个请求来指示更多的结果,例如传递一个游标)。
到目前为止我做的事情:
1.我已经实现了下面的流程和测试,但是流程没有完成。

  1. object AdditionalRequestsFlow {
  2. private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = {
  3. Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  4. import GraphDSL.Implicits._
  5. val in = builder.add(Flow[Request])
  6. val bcast = builder.add(Broadcast[Request](2))
  7. val merge = builder.add(Zip[Request, Response]())
  8. in ~> bcast ~> merge.in0
  9. bcast ~> flow ~> merge.in1
  10. FlowShape(in.in, merge.out)
  11. })
  12. }
  13. def flow[Request, Response, Output](
  14. inputFlow: Flow[Request, Response, NotUsed],
  15. anotherRequest: (Request, Response) => Option[Request],
  16. extractOutput: Response => Output,
  17. mergeOutput: (Output, Output) => Output
  18. ): Flow[Request, Output, NotUsed] = {
  19. Flow.fromGraph(GraphDSL.create() { implicit b =>
  20. import GraphDSL.Implicits._
  21. val start = b.add(Flow[Request])
  22. val merge = b.add(Merge[Request](2))
  23. val underlying = b.add(keepRequest(inputFlow))
  24. val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList))
  25. val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) =>
  26. (res, anotherRequest(req, res))
  27. })
  28. val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work
  29. start ~> merge ~> underlying ~> unzip.in
  30. unzip.out0 ~> finish
  31. merge <~ unOption <~ unzip.out1
  32. FlowShape(start.in, finish.out)
  33. })
  34. }
  35. }

字符串
测试:

  1. import akka.NotUsed
  2. import akka.actor.ActorSystem
  3. import akka.stream.ActorMaterializer
  4. import akka.stream.scaladsl.{Flow, Sink, Source}
  5. import org.scalatest.FlatSpec
  6. import org.scalatest.Matchers._
  7. import cats.syntax.option._
  8. import org.scalatest.concurrent.ScalaFutures.whenReady
  9. class AdditionalRequestsFlowSpec extends FlatSpec {
  10. implicit val system = ActorSystem()
  11. implicit val materializer = ActorMaterializer()
  12. case class Request(max: Int, batchSize: Int, offset: Option[Int] = None)
  13. case class Response(values: List[Int], nextOffset: Option[Int])
  14. private val flow: Flow[Request, Response, NotUsed] = {
  15. Flow[Request]
  16. .map { request =>
  17. val start = request.offset.getOrElse(0)
  18. val end = Math.min(request.max, start + request.batchSize)
  19. val nextOffset = if (end == request.max) None else Some(end)
  20. val result = Response((start until end).toList, nextOffset)
  21. result
  22. }
  23. }
  24. "AdditionalRequestsFlow" should "collect additional responses" in {
  25. def anotherRequest(request: Request, response: Response): Option[Request] = {
  26. response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) }
  27. }
  28. def extract(x: Response): List[Int] = x.values
  29. def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b
  30. val requests =
  31. Request(max = 35, batchSize = 10) ::
  32. Request(max = 5, batchSize = 10) ::
  33. Request(max = 100, batchSize = 1) ::
  34. Nil
  35. val expected = requests.map { x =>
  36. (0 until x.max).toList
  37. }
  38. val future = Source(requests)
  39. .via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge))
  40. .runWith(Sink.seq)
  41. whenReady(future) { x =>
  42. x shouldEqual expected
  43. }
  44. }
  45. }


1.以一种可怕的、阻塞的方式实现了相同的流,以说明我试图实现的目标:

  1. def uglyHackFlow[Request, Response, Output](
  2. inputFlow: Flow[Request, Response, NotUsed],
  3. anotherRequest: (Request, Response) => Option[Request],
  4. extractOutput: Response => Output,
  5. mergeOutput: (Output, Output) => Output
  6. ): Flow[Request, Output, NotUsed] = {
  7. implicit val system = ActorSystem()
  8. implicit val materializer = ActorMaterializer()
  9. Flow[Request]
  10. .map { x =>
  11. def grab(request: Request): Output = {
  12. val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :(
  13. val another = anotherRequest(request, response)
  14. val output = extractOutput(response)
  15. another.map { another =>
  16. mergeOutput(output, grab(another))
  17. } getOrElse output
  18. }
  19. grab(x)
  20. }
  21. }


这是可行的(但我们不应该在这一点上实现任何/Await-ing)。
1.查看了http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks,我相信它包含了答案,但我似乎找不到它。在我的情况下,我希望循环在大多数时候应该包含一个元素,所以缓冲区溢出或完全饥饿都不应该发生-但显然确实如此。
1.尝试使用.withAttributes(Attributes(LogLevels(...)))调试流,但它不会导致任何输出,尽管似乎正确配置了记录器。
我正在寻找如何修复flow方法的提示,以保持相同的签名和语义(测试将通过)。
或者我在这里做了一些完全不合适的事情(例如,在akka-stream-contrib中有一个现有的特性可以解决这个问题)?

irtuqstp

irtuqstp1#

我认为使用Source.unfold比创建自定义图要安全得多。

  1. override def getArticles(lastTokenOpt: Option[String], filterIds: (Seq[Id]) => Seq[Id]): Source[Either[String, ImpArticle], NotUsed] = {
  2. val maxRows = 1000
  3. def getUri(cursor: String, count: Int) = s"/works?rows=$count&filter=type:journal-article&order=asc&sort=deposited&cursor=${URLEncoder.encode(cursor, "UTF-8")}"
  4. Source.unfoldAsync(lastTokenOpt.getOrElse("*")) { cursor =>
  5. println(s"Getting ${getUri(cursor, maxRows)}")
  6. if (cursor.nonEmpty) {
  7. sendGetRequest[CrossRefResponse[CrossRefList[JsValue]]](getUri(cursor, maxRows)).map {
  8. case Some(response) =>
  9. response.message match {
  10. case Left(list) if response.status == "ok" =>
  11. println(s"Got ${list.items.length} items")
  12. val items = list.items.flatMap { js =>
  13. try {
  14. parseArticle(js)
  15. } catch {
  16. case ex: Throwable =>
  17. logger.error(s"Error on parsing: ${js.compactPrint}")
  18. throw ex
  19. }
  20. }
  21. list.`next-cursor` match {
  22. case Some(nextCursor) =>
  23. Some(nextCursor -> (items.map(Right.apply).toList ::: List(Left(nextCursor))))
  24. case None =>
  25. logger.error(s"`next-cursor` is missing when fetching from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
  26. Some("" -> items.map(Right.apply).toList)
  27. }
  28. case Left(jsvalue) if response.status != "ok" =>
  29. logger.error(s"API error on fetching data from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
  30. None
  31. case Right(someError) =>
  32. val cause = someError.fold(errors => errors.map(_.message).mkString(", "), ex => ex.message)
  33. logger.error(s"API error on fetching data from CrossRef [status $cause}][${getUri(cursor, maxRows)}]")
  34. None
  35. }
  36. case None =>
  37. logger.error(s"Got error on fetching ${getUri(cursor, maxRows)} from CrossRef")
  38. None
  39. }
  40. } else
  41. Future.successful(None)
  42. }.mapConcat(identity)
  43. }

字符串
在你的情况下,你可能甚至不需要把光标推到流上,我这样做是因为我把最后一个成功的光标存储在数据库中,以便在失败的情况下恢复。

展开查看全部
xfb7svmp

xfb7svmp2#

感觉这个video涵盖了你要做的事情的要点。他们创建了一个自定义的Graphstage来维护状态并将其发送回服务器,响应流取决于发送回的状态,他们还有一个事件来表示完成(在你的情况下,它会是你检查的地方
第一个月

htzpubme

htzpubme3#

可以使用Source.unfoldAsync
我准备了一个简单的项目,包括向REST API发送请求,从响应中检索数据和next_page_url,并利用next_page_url获取下一个数据块。这个过程会迭代,直到next_page_url为None。在每一步,流都会累积数据,最终将所有收集的数据组合到单个Seq中。
完整的源代码可以在over on GitHub中找到

  1. class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  2. private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  3. private val start: Option[String] = Some("https://catfact.ninja/breeds")
  4. override def getAllBreads: Future[Seq[Cat]] = {
  5. Source
  6. .unfoldAsync(start) {
  7. case Some(next) =>
  8. val nextChunkFuture: Future[CatsResponse] = sendRequest(next)
  9. nextChunkFuture.map { resp =>
  10. resp.nextPageUrl match {
  11. case Some(url) => Some((Some(url), resp.data))
  12. case None => Some((None, resp.data))
  13. }
  14. }
  15. case None => Future.successful(None)
  16. }
  17. .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  18. }
  19. private def sendRequest(url: String): Future[CatsResponse] = {
  20. logger.info(s"CatsHttpClientImpl: Sending request $url")
  21. val request = HttpRequest(
  22. uri = Uri(url),
  23. headers = List(
  24. RawHeader("Accept", "application/json")
  25. )
  26. )
  27. Http(system).singleRequest(request).flatMap { response =>
  28. response.status match {
  29. case StatusCodes.OK =>
  30. logger.info("CatsHttpClientImpl: Received success")
  31. Unmarshal(response.entity).to[CatsResponse]
  32. case _ =>
  33. logger.error("CatsHttpClientImpl: Received error")
  34. throw new CatsHttpClientException()
  35. }
  36. }
  37. }
  38. }

字符串

展开查看全部

相关问题