scala中cats.effect的io.async回调问题

wdebmtf2  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(482)

我试图通过java11重写httpclient HttpClient 在斯卡拉
这是我的密码:

  1. import cats.effect._
  2. import java.net.http._
  3. import java.net.http.HttpResponse._
  4. import java.net.http.HttpClient._
  5. trait HttpClients[F[_]] {
  6. def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]]
  7. }
  8. object HttpClients {
  9. val client: HttpClient = HttpClient.newBuilder().followRedirects(Redirect.ALWAYS).build()
  10. def newClient[F[_] : Async](): HttpClients[F] = new HttpClients[F] {
  11. override def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]] = F.async { cb =>
  12. val resp = client.sendAsync(req, BodyHandlers.ofString())
  13. val s = resp.handle((res: HttpResponse[String], err: Throwable) => {
  14. if (err == null)
  15. cb(Right(res))
  16. else
  17. cb(Left(err))
  18. })
  19. s // TODO ?
  20. // Type missmatch
  21. // Required: F[Option[F[Unit]]]
  22. // Found: Unit
  23. }
  24. }
  25. }

来自此的句柄回调
我猜错误来自这里,但我不知道下一步该怎么写。
然后我做了一些改变:

  1. def newClient[F[_] : Async](): HttpClients[F] = new HttpClients[F] {
  2. override def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]] = F.async[HttpResponse[_]] { cb =>
  3. val s = Sync[F](F: Async[F]).delay {
  4. val resp = client.sendAsync(req, BodyHandlers.ofString())
  5. resp.handle((res: HttpResponse[String], err: Throwable) => {
  6. if (err == null)
  7. cb(Right(res))
  8. else
  9. cb(Left(err))
  10. }).join()
  11. }
  12. F.delay(s.some)
  13. }
  14. }

这一次,没有错误,但我不知道如何得到响应的身体
谢谢你的回复!

xxb16uws

xxb16uws1#

@olegpyzhcov已经提供了在您使用ce3的情况下的洞察力,这个答案是在您想要的情况下使用ce2。
代码的第一个版本是正确的,下面是一个完整的运行示例,使用ammonite进行了一些样式改进,并确保为每个调用和对 newClient ```
// scala 2.13.5

import $ivy.org.typelevel::cats-effect:2.5.0

import cats.effect.{Async, IO}
import cats.syntax.all._
import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}

trait HttpClients[F[_]] {
def send(req: HttpRequest): F[HttpResponse[String]]
}

object HttpClients {
def newClient[F[_]](implicit F: Async[F]): F[HttpClients[F]] =
F.delay {
HttpClient
.newBuilder
.followRedirects(HttpClient.Redirect.ALWAYS)
.build()
} map { client =>
new HttpClients[F] {
override def send(req: HttpRequest): F[HttpResponse[String]] =
F.async { cb =>
client.sendAsync(req, HttpResponse.BodyHandlers.ofString).handle {
(res: HttpResponse[String], err: Throwable) =>
if (err == null) cb(Right(res))
else cb(Left(err))
}
}
}
}
}

object Main {
private val request =
HttpRequest
.newBuilder
.GET
.uri(URI.create("https://stackoverflow.com/questions/tagged/scala?tab=Newest"))
.build()

private val program = for {
_ <- IO.delay(println("Hello, World!"))
client <- HttpClients.newClient[IO]
response <- client.send(request)
_ <- IO.delay(println(response))
_ <- IO.delay(println(response.body))
} yield ()

def run(): Unit = {
program.unsafeRunSync()
}
}

@main
def main(): Unit = {
Main.run()
}

展开查看全部

相关问题