如何等待多个webclient flux请求完成?

monwx1rj  于 2021-06-29  发布在  Java
关注(0)|答案(2)|浏览(880)

我想:
订阅多个返回flux的端点并输出我收到的消息。
等待从所有端点输出所有消息,然后继续。
避免“一起”处理来自多个端点(例如flux.zip)的消息,因为这些端点将返回数量不均衡的消息,并且在逻辑上彼此之间没有连接
如果一个或多个端点生成无限数量的消息,则永远阻止
以下代码满足1和3,但不满足2和4:

Stream.of("http://service1.com", "http://service2.com", "http://service3.com")
                .forEach(service -> {
                    webClient.get()
                            .retrieve()
                            .bodyToFlux(String.class)
                            .map(message -> service + ": " + message)
                            .subscribe(System.out::println);
                });

        System.out.println("Received all messages");

在所有端点完成之前,不应打印行“received all messages”。但是,因为 subscribe 如果是异步的,则几乎立即打印该行,并且我的应用程序将继续而不是等待。
我应该怎么做?

yjghlzjz

yjghlzjz1#

flatmap是将通量合并在一起的一种方法,但也可以使用flux.merge

List<Flux<String>> individualResults =
        Stream.of("http://service1.com", "http://service2.com", "http://service3.com")
            .map(
                service ->
                    webClient //
                        .get()
                        .retrieve()
                        .bodyToFlux(String.class))
            .collect(toList());
    Flux<String> mergedResults = Flux.merge(individualResults);  // Will not complete until all individual Fluxes have completed.
    mergedResults //
        .doOnNext(System.out::println)
        .then()
        .block(); // block this thread until mergedResults completes
    System.out.println("Received all messages");
9udxz4iz

9udxz4iz2#

我相信下面的代码片段在你的问题中达到了四分之三,尽管我觉得我没有完全理解第三个要求。让我看看这个例子是否满足了需要,如果不满足,还缺少什么。

Flux.just("http://service1.com", "http://service2.com", "http://service3.com")
            .flatMap(url -> webClient.get()
                    .uri(url)
                    .retrieve()
                    .bodyToFlux(String.class)
                    .map(body -> url + ":" + body)
            )
            .collectList()
            .doOnNext(list -> LOG.info("Received all messages"))
            .subscribe(list -> LOG.info("" + list));

相关问题