flink提高异步操作的并行性

jqjz2hbq  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(480)

我们有异步功能,异步操作是使用akka http客户端完成的

class Foo[A,B] extends AsyncFunction[A, B] with {
  val akkaConfig = ConfigFactory.load()
  implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
  implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
  implicit lazy val materializer = ActorMaterializer()
    def postReq(uriStr: String, str: String): Future[HttpResponse] = {
        Http().singleRequest(HttpRequest(
          method = HttpMethods.POST,
          uri = uriStr,
          entity = HttpEntity(ContentTypes.`application/json`, str))
        )
      }

 override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit  = {
    val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...

问题:
如果我想增加http请求的并行性-我应该使用akka config还是有办法通过flink.yamel进行配置
既然flink也在使用akka,那么创建 ActorSystem 以及 ExecutionContext ?

dw1jzc5e

dw1jzc5e1#

对于第一个问题,有三种不同的设置会影响性能和实际执行的请求数:
并行性,这将导致flink创建多个 AsyncFunction 包括多个 HttpClient .
函数本身中的并发请求数。当你打电话的时候 orderedWait 或者 unorderedWait 你应该提供 capacity 在函数中,这将限制并发请求的数量。
http客户端的实际设置。
如你所见,点2。和3。连接,因为flink可以限制可能的并发请求数,所以有时更改http客户机设置可能没有效果,因为请求数由flink intself限制。
提高产品的吞吐量 AsyncFunction 视情况而定。你要记住这一点 AsyncFunction 在单线程中被调用。这基本上意味着,如果您调用的服务的响应时间很长,您只需阻塞等待响应的请求数,因此唯一的方法就是增加响应时间 parallelism' . 但是,通常,更改 HttpClient 以及 capacity 函数的使用应该可以让您获得更好的吞吐量。
至于第二个问题,我认为创建多个 ActorSystems . 您可以看到[此处]回答的类似问题。1

相关问题