我使用spring的“webclient”和projectreactor对url列表进行非阻塞调用。我的要求是:
对URL列表异步调用get
在调用每个url时记录url
记录导致异常的调用的url
记录成功呼叫的url
记录导致非2xx http状态的调用的url
发送一封电子邮件,其中包含调用导致异常或非2xx http状态的URL列表
以下是我的尝试:
List<Mono<ClientResponse>> restCalls = new ArrayList<>();
List<String> failedUrls = new ArrayList<>();
for (String serviceUrl : serviceUrls.getServiceUrls()) {
restCalls.add(
webClientBuilder
.build()
.get()
.uri(serviceUrl)
.exchange()
.doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
.doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
.doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
}
Flux.fromIterable(restCalls)
.map((data) -> data.subscribe())
.onErrorContinue((throwable, e) -> {
log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
failedUrls.add(serviceUrl);
})
.collectList()
.subscribe((data) -> {
log.info("all called");
email.send("Failed URLs are {}", failedUrls);
});
问题是电子邮件是在电话回复之前发送的。如何在调用之前等待所有URL调用完成 email.send
?
3条答案
按热度按时间idfiyjo81#
如comment中所述,您的示例中的主要错误是使用了“subscribe”,即启动查询,但与主流量无关,因此您无法返回错误或结果。
subscribe是管道上的一种触发器操作,它不用于链接。
下面是一个完整的示例(电子邮件除外,替换为日志记录):
efzxgjgh2#
我还没测试过这个,但应该有用
因此,如果您还没有这样做,您的服务调用必须是非阻塞的,因此您应该将类型转换为flux。所以在rest服务中,您的方法应该是这样的
希望有帮助
hpcdzsge3#