我们有异步功能,异步操作是使用akka http客户端完成的
class Foo[A,B] extends AsyncFunction[A, B] with {
val akkaConfig = ConfigFactory.load()
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
implicit lazy val materializer = ActorMaterializer()
def postReq(uriStr: String, str: String): Future[HttpResponse] = {
Http().singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = uriStr,
entity = HttpEntity(ContentTypes.`application/json`, str))
)
}
override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit = {
val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...
问题:
如果我想增加http请求的并行性-我应该使用akka config还是有办法通过flink.yamel进行配置
既然flink也在使用akka,那么创建 ActorSystem
以及 ExecutionContext
?
1条答案
按热度按时间dw1jzc5e1#
对于第一个问题,有三种不同的设置会影响性能和实际执行的请求数:
并行性,这将导致flink创建多个
AsyncFunction
包括多个HttpClient
.函数本身中的并发请求数。当你打电话的时候
orderedWait
或者unorderedWait
你应该提供capacity
在函数中,这将限制并发请求的数量。http客户端的实际设置。
如你所见,点2。和3。连接,因为flink可以限制可能的并发请求数,所以有时更改http客户机设置可能没有效果,因为请求数由flink intself限制。
提高产品的吞吐量
AsyncFunction
视情况而定。你要记住这一点AsyncFunction
在单线程中被调用。这基本上意味着,如果您调用的服务的响应时间很长,您只需阻塞等待响应的请求数,因此唯一的方法就是增加响应时间parallelism'
. 但是,通常,更改HttpClient
以及capacity
函数的使用应该可以让您获得更好的吞吐量。至于第二个问题,我认为创建多个
ActorSystems
. 您可以看到[此处]回答的类似问题。1