如何使用SpringWebClient进行非阻塞呼叫,并在所有呼叫完成后发送电子邮件?

7cjasjjr  于 2021-07-15  发布在  Java
关注(0)|答案(3)|浏览(442)

我使用spring的“webclient”和projectreactor对url列表进行非阻塞调用。我的要求是:
对URL列表异步调用get
在调用每个url时记录url
记录导致异常的调用的url
记录成功呼叫的url
记录导致非2xx http状态的调用的url
发送一封电子邮件,其中包含调用导致异常或非2xx http状态的URL列表
以下是我的尝试:

  1. List<Mono<ClientResponse>> restCalls = new ArrayList<>();
  2. List<String> failedUrls = new ArrayList<>();
  3. for (String serviceUrl : serviceUrls.getServiceUrls()) {
  4. restCalls.add(
  5. webClientBuilder
  6. .build()
  7. .get()
  8. .uri(serviceUrl)
  9. .exchange()
  10. .doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
  11. .doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
  12. .doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
  13. }
  14. Flux.fromIterable(restCalls)
  15. .map((data) -> data.subscribe())
  16. .onErrorContinue((throwable, e) -> {
  17. log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
  18. failedUrls.add(serviceUrl);
  19. })
  20. .collectList()
  21. .subscribe((data) -> {
  22. log.info("all called");
  23. email.send("Failed URLs are {}", failedUrls);
  24. });

问题是电子邮件是在电话回复之前发送的。如何在调用之前等待所有URL调用完成 email.send ?

idfiyjo8

idfiyjo81#

如comment中所述,您的示例中的主要错误是使用了“subscribe”,即启动查询,但与主流量无关,因此您无法返回错误或结果。
subscribe是管道上的一种触发器操作,它不用于链接。
下面是一个完整的示例(电子邮件除外,替换为日志记录):

  1. package fr.amanin.stackoverflow;
  2. import org.springframework.http.HttpStatus;
  3. import org.springframework.web.reactive.function.client.WebClient;
  4. import reactor.core.publisher.Flux;
  5. import reactor.core.publisher.Mono;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. import java.util.logging.Level;
  9. import java.util.logging.Logger;
  10. import java.util.stream.Collectors;
  11. public class WebfluxURLProcessing {
  12. private static final Logger LOGGER = Logger.getLogger("example");
  13. public static void main(String[] args) {
  14. final List<String> urls = Arrays.asList("https://www.google.com", "https://kotlinlang.org/kotlin/is/wonderful/", "https://stackoverflow.com", "http://doNotexists.blabla");
  15. final Flux<ExchangeDetails> events = Flux.fromIterable(urls)
  16. // unwrap request async operations
  17. .flatMap(url -> request(url))
  18. // Add a side-effect to log results
  19. .doOnNext(details -> log(details))
  20. // Keep only results that show an error
  21. .filter(details -> details.status < 0 || !HttpStatus.valueOf(details.status).is2xxSuccessful());
  22. sendEmail(events);
  23. }
  24. /**
  25. * Mock emails by collecting all events in a text and logging it.
  26. * @param report asynchronous flow of responses
  27. */
  28. private static void sendEmail(Flux<ExchangeDetails> report) {
  29. final String formattedReport = report
  30. .map(details -> String.format("Error on %s. status: %d. Reason: %s", details.url, details.status, details.error.getMessage()))
  31. // collecting (or reducing, folding, etc.) allows to gather all upstream results to use them as a single value downstream.
  32. .collect(Collectors.joining(System.lineSeparator(), "REPORT:"+System.lineSeparator(), ""))
  33. // In a real-world scenario, replace this with a subscribe or chaining to another reactive operation.
  34. .block();
  35. LOGGER.info(formattedReport);
  36. }
  37. private static void log(ExchangeDetails details) {
  38. if (details.status >= 0 && HttpStatus.valueOf(details.status).is2xxSuccessful()) {
  39. LOGGER.info("Success on: "+details.url);
  40. } else {
  41. LOGGER.log(Level.WARNING,
  42. "Status {0} on {1}. Reason: {2}",
  43. new Object[]{
  44. details.status,
  45. details.url,
  46. details.error == null ? "None" : details.error.getMessage()
  47. });
  48. }
  49. }
  50. private static Mono<ExchangeDetails> request(String url) {
  51. return WebClient.create(url).get()
  52. .retrieve()
  53. // workaround to counter fail-fast behavior: create a special error that will be converted back to a result
  54. .onStatus(status -> !status.is2xxSuccessful(), cr -> cr.createException().map(err -> new RequestException(cr.statusCode(), err)))
  55. .toBodilessEntity()
  56. .map(response -> new ExchangeDetails(url, response.getStatusCode().value(), null))
  57. // Convert back custom error to result
  58. .onErrorResume(RequestException.class, err -> Mono.just(new ExchangeDetails(url, err.status.value(), err.cause)))
  59. // Convert errors that shut connection before server response (cannot connect, etc.) to a result
  60. .onErrorResume(Exception.class, err -> Mono.just(new ExchangeDetails(url, -1, err)));
  61. }
  62. public static class ExchangeDetails {
  63. final String url;
  64. final int status;
  65. final Exception error;
  66. public ExchangeDetails(String url, int status, Exception error) {
  67. this.url = url;
  68. this.status = status;
  69. this.error = error;
  70. }
  71. }
  72. private static class RequestException extends RuntimeException {
  73. final HttpStatus status;
  74. final Exception cause;
  75. public RequestException(HttpStatus status, Exception cause) {
  76. this.status = status;
  77. this.cause = cause;
  78. }
  79. }
  80. }
展开查看全部
efzxgjgh

efzxgjgh2#

我还没测试过这个,但应该有用

  1. public void check() {
  2. List<Flux<String>> restCalls = new ArrayList<>();
  3. for (String serviceUrl : serviceUrls.getServiceUrls()) {
  4. restCalls.add(rest.getForEntity(serviceUrl, String.class));
  5. }
  6. Flux.fromIterable(restCalls)
  7. .map((data) -> data.blockFirst())
  8. .onErrorContinue((throwable, e) -> {
  9. ((WebClientResponseException) throwable).getRequest().getURI(); // get the failing URI
  10. // do whatever you need with the failed service
  11. })
  12. .collectList() // Collects all the results into a list
  13. .subscribe((data) -> {
  14. // from here do whatever is needed from the results
  15. });
  16. }

因此,如果您还没有这样做,您的服务调用必须是非阻塞的,因此您应该将类型转换为flux。所以在rest服务中,您的方法应该是这样的

  1. public Flux<String> getForEntity(String name) {
  2. return this.webClient.get().uri("url", name)
  3. .retrieve().bodyToFlux(String.class);
  4. }

希望有帮助

展开查看全部
hpcdzsge

hpcdzsge3#

  1. restCalls.add(
  2. webClientBuilder
  3. .build()
  4. .get()
  5. .uri(serviceUrl)
  6. .exchange()
  7. .doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
  8. .doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
  9. .doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
  10. Flux.fromIterable(restCalls)
  11. .map((data) -> data.subscribe())
  12. .onErrorContinue((throwable, e) -> {
  13. log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
  14. failedUrls.add(serviceUrl);
  15. })
  16. .collectList()
  17. .subscribeOn(Schedulers.elastic())
  18. .subscribe((data) -> {
  19. log.info("all called");
  20. email.send("Failed URLs are {}", failedUrls);
  21. });
展开查看全部

相关问题