java 使用webflux进行异步调用,然后在响应时执行任务

cunj1qz1  于 2023-09-29  发布在  Java
关注(0)|答案(1)|浏览(168)

我需要使用分页响应并发调用API,然后对每个响应的每个项执行异步任务。我的一个朋友建议WebFlux是解决这个问题的方法,所以我决定给予一下,但事实证明,如果没有适当的培训,它很难使用,我在这方面非常新。
我必须同时获取API的前5页(每页10个项目),并同时使用每个项目的URL值保存图像。没有必要等待回应。
MyService

@Service
@Slf4j
public class MyService {

    WebClient webClient = WebClient.create("http://host");

    public void downloadImages() {
        List<Integer> range = IntStream.rangeClosed(1, 5)
                .boxed().toList();
        Flux.fromIterable(range)
                .flatMap(pageNumber -> fetchPages(pageNumber), 10); //10 asynchronous calls
    }

    private Flux<APIResponse> fetchPages(int page) {
        return webClient.get()
                .uri("/images?page={page}", page)
                .retrieve()
                .bodyToFlux(APIResponse.class)
                .flatMap(response -> downloadImage(response));
    }

    //What does this method actually looks like? Publisher was auto-completed. 
    // It needs to iterate each actual response
    // as it becomes availables and perform 
    private Publisher downloadImages(APIResponse response) {
        //download each image using url of response.getImages();
        //best to do parallelStream foreach for this task?
        // or use a thread pool to download each image?

        response.getImages().forEach(System.out::println); //this is never executed
        return null; //what's a publisher?
    }
}

我的APIResponse有一个像这样的Image对象:

@Data
public class ImageResponse {
    private Long id;
    private String imageURL;
}
bxgwgixi

bxgwgixi1#

首先是:PublisherMonoFlux的父类。它在reactivestream API中被定义为价值生产者(流源)。Mono是发送0或1个元素的特化,Flux是向许多元素发送0的特化。
此外,通常,发布者直到subscription(subscribe()或block())才被触发。通常,在Spring应用程序中,flux或Mono被传播到Web(您的rest控制器或请求路由器)或数据库层(reactive/r2 dbc存储库),它们将负责订阅flux。虽然您仍然可以自己订阅,但您有责任在Scheduler上使用元素和调度任务。
如果您下载图像原始内容,则可按如下方式重新提供服务:

@Service
@Slf4j
public class MyService {

    WebClient webClient = WebClient.create("http://host");

    /**
     * @return One data buffer per fetched image. 
     */
    public Flux<DataBuffer> downloadImages() {
        return Flux.range(1, 5)
                   .flatMap(this::fetchPages, 10)
                   .flatMap(this::downloadImage);
    }

    private Flux<APIResponse> fetchPages(int page) {
        return webClient.get()
                .uri("/images?page={page}", page)
                .retrieve()
                .bodyToFlux(APIResponse.class);
    }

    
    private Flux<DataBuffer> downloadImages(APIResponse response) {
        return response.getImages()
                       .flatMap(this::downloadImage);
    }

    private Mono<DataBuffer> downloadImage(URL imageUrl) {
        return webClient.get().uri(url)
                        .retrieve()
                        .bodyToMono<DataBuffer>()
    }
}

在单元测试中,您可以使用reactor test API来测试它,如下所示:

StepVerifier.create(myService.downloadImages())
            // Number of images you expect to download
            .expectNextCount(2)
            .verifyComplete();

相关问题