Akka Streams递归流调用

jfgube3f  于 2024-01-08  发布在  其他
关注(0)|答案(2)|浏览(230)

我正在尝试使用Akka Streams实现分页。

  1. case class SomeObject(id:Long, next_page:Option[Map[String,String]])
  2. def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
  3. if (uri.isEmpty) return Future.successful(None)
  4. val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
  5. response.map { resp =>
  6. resp.next_page match {
  7. case Some(next_page) => Some(next_page("uri"), resp.data)
  8. case _ => Some(Uri.Empty, resp.data)
  9. }
  10. }
  11. }
  12. Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)

字符串
问题是,如果我执行source.take(1000),并且分页有很多元素(页面),那么下游在Source.unfoldAsync完成之前不会获得新元素。
我试着在流动中使用循环,

  1. val in = builder.add(Flow[Uri])
  2. val out = builder.add[Flow[T]]
  3. val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
  4. val merge = b.add(Merge[Response[T]],2)
  5. in ~> mergeUri ~> sendRequest ~> partition
  6. mergeUri.preferred <~ extractNextUri <~ partition.out(1)
  7. partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
  8. FlowShape(in.in, out.out)


但是上面的代码不起作用。
我一直在创建我自己的GraphStage。UnfoldAsync需要第一个元素,但在Flow解决方案中,我没有“第一个”元素。有什么建议吗?
谢谢

qlfbtfca

qlfbtfca1#

通过编写自己的GraphStage找到了解决方案

  1. final class PaginationGraphStage[S <: Uri, E](f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContextExecutor)
  2. extends GraphStage[FlowShape[S, E]]{
  3. val in: Inlet[S] = Inlet[S]("PaginationGraphStage.in")
  4. val out: Outlet[E] = Outlet[E]("PaginationGraphStage.out")
  5. override val shape: FlowShape[S, E] = FlowShape.of(in, out)
  6. override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
  7. new GraphStageLogic(shape) with OutHandler with InHandler {
  8. private[this] var state: S = _
  9. private[this] var inFlight = 0
  10. private[this] var asyncFinished = false
  11. private[this] def todo: Int = inFlight
  12. def futureCompleted(result: Try[Option[(Uri, E)]]): Unit = {
  13. inFlight -= 1
  14. result match {
  15. case Failure(ex) => fail(out, ex)
  16. case Success(None) =>
  17. asyncFinished = true
  18. complete(out)
  19. case Success(Some((newS: S, elem: E))) if !newS.isEmpty =>
  20. push(out, elem)
  21. state = newS
  22. case Success(Some((newS: Uri, elem: E))) =>
  23. push(out, elem)
  24. asyncFinished = true
  25. if (isAvailable(in)) getHandler(in).onPush()
  26. else completeStage()
  27. }
  28. }
  29. private val futureCB = getAsyncCallback(futureCompleted)
  30. private val invokeFutureCB: Try[Option[(S, E)]] => Unit = futureCB.invoke
  31. private def pullIfNeeded(): Unit = {
  32. if (!hasBeenPulled(in)) tryPull(in)
  33. }
  34. override def onUpstreamFinish(): Unit = {
  35. if (todo == 0) completeStage()
  36. }
  37. def onPull(): Unit = {
  38. if (state != null) {
  39. asyncFinished = false
  40. inFlight += 1
  41. val future = f(state)
  42. future.value match {
  43. case None => future.onComplete(invokeFutureCB)
  44. case Some(v) => futureCompleted(v)
  45. }
  46. } else {
  47. pullIfNeeded()
  48. }
  49. }
  50. override def onPush(): Unit = {
  51. if (state == null) {
  52. inFlight += 1
  53. state = grab(in)
  54. pullIfNeeded()
  55. getHandler(out).onPull()
  56. }
  57. if (asyncFinished) {
  58. inFlight += 1
  59. state = grab(in)
  60. pullIfNeeded()
  61. }
  62. }
  63. setHandlers(in, out, this)
  64. }
  65. }

字符串

展开查看全部
evrscar2

evrscar22#

Source.unfoldAsync就是你要找的。
我准备了一个简单的项目,它遍历REST API的所有页面,并累积所有页面的结果,返回Future with Seq。
您可以在over on GitHub中找到完整的源代码和项目

  1. class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  2. private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  3. private val start: Option[String] = Some("https://catfact.ninja/breeds")
  4. override def getAllBreads: Future[Seq[Cat]] = {
  5. Source
  6. .unfoldAsync(start) {
  7. case Some(next) =>
  8. val nextChunkFuture: Future[CatsResponse] = sendRequest(next)
  9. nextChunkFuture.map { resp =>
  10. resp.nextPageUrl match {
  11. case Some(url) => Some((Some(url), resp.data))
  12. case None => Some((None, resp.data))
  13. }
  14. }
  15. case None => Future.successful(None)
  16. }
  17. .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  18. }
  19. private def sendRequest(url: String): Future[CatsResponse] = {
  20. logger.info(s"CatsHttpClientImpl: Sending request $url")
  21. val request = HttpRequest(
  22. uri = Uri(url),
  23. headers = List(
  24. RawHeader("Accept", "application/json")
  25. )
  26. )
  27. Http(system).singleRequest(request).flatMap { response =>
  28. response.status match {
  29. case StatusCodes.OK =>
  30. logger.info("CatsHttpClientImpl: Received success")
  31. Unmarshal(response.entity).to[CatsResponse]
  32. case _ =>
  33. logger.error("CatsHttpClientImpl: Received error")
  34. throw new CatsHttpClientException()
  35. }
  36. }
  37. }
  38. }

字符串

展开查看全部

相关问题