Spring Boot 从Flux.flatMap()迁移到Flux.map()会中断执行

atmip9wb  于 2023-02-16  发布在  Spring
关注(0)|答案(1)|浏览(183)

我开始将Spring WebFlux集成到我们的项目中,最近遇到了一个关于Flux.flatMap()的问题,下面的代码总结了这个问题:

@Component
@RequiredArgsConstructor
public class Client {

    private final ParameterizedTypeReference<List<ResponseDto>> TYPE = new ParameterizedTypeReference<>() {
    };

    private final WebClient webClient;

    public Mono<List<ResponseDto>> queryData(List<RequestDto> request) {
        return webClient.post()
                .uri("/api/query-data")
                .bodyValue(request)
                .retrieve()
                .bodyToMono(TYPE);
    }
}

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DemoApplicationTest {
    static final int COUNT = 1000_000;

    @Autowired
    private Client client;

    @Test
    void requestsAreSent() {
        List<RequestDto> requests = getRequests();
        List<List<RequestDto>> partition = Lists.partition(requests, 1000);
        Flux.fromIterable(partition)
                .flatMap(client::queryData)
                .collectList()
                .block();
    }

    @Test
    void requestsAreNotSent() {
        List<RequestDto> requests = getRequests();
        List<List<RequestDto>> partition = Lists.partition(requests, 1000);
        Flux.fromIterable(partition)
                .map(client::queryData)
                .collectList()
                .block();
    }

    private static List<RequestDto> getRequests() {
        return IntStream.range(0, COUNT)
                .boxed()
                .map(operand -> RequestDto.builder().build())
                .collect(Collectors.toList());
    }
}

如果我使用Flux.flatMap(),我会看到我的测试向webserver发送请求,但是如果我使用Flux.map(),则不会发送请求。
有人能解释这种行为吗?是固有的还是我配置错了?

nuypyhwy

nuypyhwy1#

MonoFlux这样的被动发布者是懒惰的。这意味着除非你订阅它们,否则它们不会被执行。在你的requestsAreSent()requestsAreNotSent()方法中,你只是**通过使用block()方法订阅外部发布者。
requestsAreSent()方法中发送请求的原因是因为你使用了flatMap(),这个操作将内部发布者从client.queryData()扁平化到外部发布者,这意味着如果你订阅了外部观察对象,你也订阅了内部观察对象。
另一方面,在您的requestsAreNotSent()方法中,发布者不是扁平的,并且因为您只订阅外部的可观察对象,所以client.queryData()中的逻辑不会执行。
例如,如果你订阅了所有这些内在可观察性,那么逻辑就会执行:

List<Mono<List<ResponseDto>>> list = Flux.fromIterable(partition)
    .map(client::queryData)
    .collectList()
    .block();
list.forEach(Mono::block); // Subscribing to the inner observables

但是,这是一个不好的做法,您应该继续使用flatMap()

相关问题