java—生成分页的webclient请求并在流量中使用响应

mcvgt66p  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(440)

我向第三方web服务重复发出分页webclient请求。我现在的实现可以工作,但是阻塞了。
我目前的实施情况:

var elementsPerPage = 10;
Flux
    .generate(
        () -> 0,
        (pageIndex, emitter) -> {
            BlahServiceResponse blahServiceResponse =
                webClient
                    .get()
                    .uri("/blah?pageIndex={pageIndex}", pageIndex)
                    .retrieve()
                    .bodyToMono(BlahServiceResponse.class)
                    .block(); // Yuck!!!
            if (blahServiceResponse.getStudents().size() > 0) {
                emitter.next(blahServiceResponse);
            } else {
                emitter.complete();
            }
            return pageIndex + elementsPerPage;
        }
    )
    .subscribe(System.out::println); // Replace me with actual logic

出于可理解的原因,如果将上述代码更改为以下内容,则会引发“illegalstateexception:生成器未调用任何synchronoussink方法”异常:

webClient
    .get()
    ...
    .bodyToMono(BlahServiceResponse.class)
    .subscribe(emitter::next);

所以我开始寻找一个异步接收器,并意识到它是通量单接收器。但据我所知,flux中没有一个builder方法支持使用flux | monosink生成有状态元素。
我遗漏了什么吗?有没有更优雅的方法?

1wnzp6jl

1wnzp6jl1#

静态分页

如果您事先知道页面索引,并且有规则生成它。

var elementsPerPage = 10;

Flux.generate(
        () -> 0,
        (pageIndex, emitter) -> {
            if (pageIndex < 30) {
                emitter.next(pageIndex);
            } else {
                emitter.complete();
            }
            return pageIndex + elementsPerPage;
        })
        .flatMap(pageIndex -> webClient
                .get()
                .uri("/blah?pageIndex={pageIndex}", pageIndex)
                .retrieve()
                .bodyToMono(BlahServiceResponse.class))
        .subscribe(System.out::println);

动态分页

如果下一页索引依赖于最后查询的页。

public static void main(String[] args) {
    var elementsPerPage = 10;

    callWithPageIndex(0)
            .expand(pagedResponse -> {
                if (pagedResponse.getResponse().isEmpty()) {
                    return Mono.empty();
                } else {
                    return callWithPageIndex(pagedResponse.getPageIndex() + elementsPerPage);
                }
            })
            .subscribe(System.out::println);
}

private static Mono<PagedResponse<BlahServiceResponse>> callWithPageIndex(Integer pageIndex) {
    return webClient
            .get()
            .uri("/blah?pageIndex={pageIndex}", pageIndex)
            .retrieve()
            .bodyToMono(BlahServiceResponse.class)
            .map(response -> new PagedResponse<>(pageIndex, response));
}

@lombok.Value
static class PagedResponse<T> {
    int pageIndex;
    T response;
}

相关问题