java—生成分页的webclient请求并在流量中使用响应

mcvgt66p  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(500)

我向第三方web服务重复发出分页webclient请求。我现在的实现可以工作,但是阻塞了。
我目前的实施情况:

  1. var elementsPerPage = 10;
  2. Flux
  3. .generate(
  4. () -> 0,
  5. (pageIndex, emitter) -> {
  6. BlahServiceResponse blahServiceResponse =
  7. webClient
  8. .get()
  9. .uri("/blah?pageIndex={pageIndex}", pageIndex)
  10. .retrieve()
  11. .bodyToMono(BlahServiceResponse.class)
  12. .block(); // Yuck!!!
  13. if (blahServiceResponse.getStudents().size() > 0) {
  14. emitter.next(blahServiceResponse);
  15. } else {
  16. emitter.complete();
  17. }
  18. return pageIndex + elementsPerPage;
  19. }
  20. )
  21. .subscribe(System.out::println); // Replace me with actual logic

出于可理解的原因,如果将上述代码更改为以下内容,则会引发“illegalstateexception:生成器未调用任何synchronoussink方法”异常:

  1. webClient
  2. .get()
  3. ...
  4. .bodyToMono(BlahServiceResponse.class)
  5. .subscribe(emitter::next);

所以我开始寻找一个异步接收器,并意识到它是通量单接收器。但据我所知,flux中没有一个builder方法支持使用flux | monosink生成有状态元素。
我遗漏了什么吗?有没有更优雅的方法?

1wnzp6jl

1wnzp6jl1#

静态分页

如果您事先知道页面索引,并且有规则生成它。

  1. var elementsPerPage = 10;
  2. Flux.generate(
  3. () -> 0,
  4. (pageIndex, emitter) -> {
  5. if (pageIndex < 30) {
  6. emitter.next(pageIndex);
  7. } else {
  8. emitter.complete();
  9. }
  10. return pageIndex + elementsPerPage;
  11. })
  12. .flatMap(pageIndex -> webClient
  13. .get()
  14. .uri("/blah?pageIndex={pageIndex}", pageIndex)
  15. .retrieve()
  16. .bodyToMono(BlahServiceResponse.class))
  17. .subscribe(System.out::println);

动态分页

如果下一页索引依赖于最后查询的页。

  1. public static void main(String[] args) {
  2. var elementsPerPage = 10;
  3. callWithPageIndex(0)
  4. .expand(pagedResponse -> {
  5. if (pagedResponse.getResponse().isEmpty()) {
  6. return Mono.empty();
  7. } else {
  8. return callWithPageIndex(pagedResponse.getPageIndex() + elementsPerPage);
  9. }
  10. })
  11. .subscribe(System.out::println);
  12. }
  13. private static Mono<PagedResponse<BlahServiceResponse>> callWithPageIndex(Integer pageIndex) {
  14. return webClient
  15. .get()
  16. .uri("/blah?pageIndex={pageIndex}", pageIndex)
  17. .retrieve()
  18. .bodyToMono(BlahServiceResponse.class)
  19. .map(response -> new PagedResponse<>(pageIndex, response));
  20. }
  21. @lombok.Value
  22. static class PagedResponse<T> {
  23. int pageIndex;
  24. T response;
  25. }
展开查看全部

相关问题