springwebflux在React堆栈中使用阻塞的httpclient

41ik7eoe  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(452)

我目前正在进行一个构建微服务的项目,并且正在尝试从更传统的spring boot转变 RestClient 使用netty和 WebClient 作为http客户端,以便连接到后端系统。
这对于带有restapi的后端来说是很好的,但是实现起来仍然有一些困难 WebClient 连接到soap后端和oracle数据库的服务,后者仍然使用传统的jdbc。
我设法在网上找到了一些关于jdbc调用的解决方法,这些方法使用并行调度程序来发布阻塞jdbc调用的结果:

//the method that is called by @Service
@Override
public Mono<TransactionManagerModel> checkTransaction(String transactionId, String channel, String msisdn) {
    return asyncCallable(() -> checkTransactionDB(transactionId, channel, msisdn))
            .onErrorResume(error -> Mono.error(error));
}

...

//the actual JDBC call
private TransactionManagerModel checkTransactionDB(String transactionId, String channel, String msisdn) {
...
    List<TransactionManagerModel> result = 
                    jdbcTemplate.query(CHECK_TRANSACTION, paramMap, new BeanPropertyRowMapper<>(TransactionManagerModel.class));
...
}

//Generic async callable
private <T> Mono<T> asyncCallable(Callable<T> callable) {
    return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).publishOn(transactionManagerJdbcScheduler);
}

我觉得这很管用。
而对于soap调用,我所做的是将soap调用封装在 Mono 而soap调用本身正在使用 CloseableHttpClient 这显然是一个阻塞的http客户端。

//The method that is being 'reactive'
public Mono<OfferRs> addOffer(String transactionId, String channel, String serviceId, OfferRq request) {
...
    OfferRs result = adapter.addOffer(transactionId, channel, generateRequest(request));
...
}

//The SOAP adapter that uses blocking HTTP Client
public OfferRs addOffer(String transactionId, String channel, JAXBElement<OfferRq> request) {
...
    response = (OfferRs) getWebServiceTemplate().marshalSendAndReceive(url, request, webServiceMessage -> {
            try {
                SoapHeader soapHeader = ((SoapMessage) webServiceMessage).getSoapHeader();

                ObjectFactory headerFactory = new ObjectFactory();
                AuthenticationHeader authHeader = headerFactory.createAuthenticationHeader();
                authHeader.setUserName(username);
                authHeader.setPassWord(password);

                JAXBContext headerContext = JAXBContext.newInstance(AuthenticationHeader.class);
                Marshaller marshaller = headerContext.createMarshaller();
                marshaller.marshal(authHeader, soapHeader.getResult());
            } catch (Exception ex) {
                log.error("Failed to marshall SOAP Header!", ex);
            }
        });
        return response;
...
}

我的问题是:这个soap调用的实现是否足够“被动”,以至于我不必担心某些调用在微服务的某个部分被阻塞?我已经实现了React式堆栈调用 block() 显式将抛出异常,因为使用netty时不允许这样做。
或者我应该适应平行的用法 Schedulers 在soap调用中也是如此?

lh80um4z

lh80um4z1#

经过一番讨论,我会写一个答案。
reactor文档说明您应该在它们自己的调度程序上放置阻塞调用。这基本上是为了保持reactor的非阻塞部分继续运行,如果阻塞中出现了某些内容,reactor将退回到传统的servlet行为,这意味着为每个请求分配一个线程。
reactor有非常好的关于调度器的文档,它们的类型等等。
但简短地说:

未订阅

当有人订阅时,React堆将进入一个叫做 assembly phase 这意味着它将基本上从订阅点开始向后向上游调用操作符,直到找到一个数据生产者(例如一个数据库或另一个服务等)。如果它找到一个 onSubscribe -操作符在这个阶段的某个地方,它将把整个链放在自己定义的位置上 Scheduler . 所以要知道的一件好事是 onSubscribe 其实没关系,只要是在 assembly phase 整个链条都会受到影响。
示例用法可以是:
我们有对数据库的阻塞调用,使用阻塞rest客户机的慢速调用,在阻塞庄园中从系统读取文件等等。

发布时

如果你有 onPublish 在战争中的某个地方 assembly phase 链将知道它放置在何处,链将在该特定点从默认调度程序切换到指定的调度程序。所以呢 onPublish 位置很重要。因为它会在放置的位置切换。这个操作符更能控制您要在代码中的特定点上的特定调度器上放置某些内容。
使用示例如下:
如果你在某个特定的点上进行了大量的阻塞cpu计算,你可以切换到 Scheduler.parallell() 这将保证所有的计算都将被放置在不同的内核上,这样做会产生大量的cpu工作,当您完成时,您可以切换回默认的调度程序。

上面的例子

您的soap调用应该独立进行 Scheduler 如果他们挡住了我想 onSubscribe 只要使用 Schedulers.elasticBound() 使用传统的servlet行为就可以了。如果您觉得害怕在同一个调度程序上执行每个阻塞调用,那么可以传入 SchedulerasyncCallable 函数和拆分调用以使用不同的 Schedulers .

相关问题