如何提供mono消费者作为方法参数?

7fyelxc5  于 2021-07-13  发布在  Java
关注(0)|答案(2)|浏览(332)

我有一个通用方法,它并行执行一堆请求,将它们发送到外部webapi,并收集结果。
问题:调用方应该能够为web请求的子例程提供成功和错误处理程序。但是怎么做呢?
例子:

public <Req, Rsp> List<Rsp> sendApiRequests(List<Req> requests, Class<Rsp> type) {
    return Flux.fromIterable(requests)
            .flatMap(req -> webClient.post()
                       .uri(uri)
                       .bodyValue(req)
                       .retrieve()
                       .bodyToMono(type)
                       .//TODO how can I add a generic handler methods here?
                       .)
            .filter(rsp -> rsp.success() && rsp.results > 0) //some common filters
            .collectList()
            .block();
}

调用者应该能够提供一个非常有用的子例程。它可以提供一个 .doOnSuccess() ,一个 .doOnerror() 或者 .onErrorMap() 功能。以及任何其他或全部。
伪代码:

sendApiRequests(req, MyDto.class, req -> <subroutine>.doOnSuccess(..)
                                                     .doOnError(..)
                                                     .onErrorResume(..));

这有可能吗?因为我不暴露 Mono 从房间里出来 webClient.post() 请求调用者时,我不知道如何在其上链接处理程序方法。

0ve6wy6x

0ve6wy6x1#

您可以在这里使用transform操作符。我不会称之为添加“消费者”,而是在React管道的特定点插入额外的操作符。它看起来像:

public String blockingCall(Function<Mono<String>, Publisher<String>> transformer) {
    return Mono.just("hi")
               .transform(transformer)
               .block();
}

使用方法如下:

blockingCall(it -> it.doOnSuccess(...).doOnError(...))
33qvvth1

33qvvth12#

你应该使用 Mono#transform . 根据React堆文件:
transform操作符允许您将操作符链的一部分封装到函数中。该函数在装配时应用于原始操作符链,以使用封装的操作符对其进行扩充。这样做将对序列的所有订户应用相同的操作,基本上等同于直接链接操作符。
就你而言:

public <Req, Rsp> List<Rsp> sendApiRequests(List<Req> requests, Class<Rsp> type, Function<Mono<Rsp>, Mono<Rsp>> subroutine) {
        return Flux.fromIterable(requests)
                .flatMap((Req req) -> webClient.post()
                        .uri(uri)
                        .bodyValue(req)
                        .retrieve()
                        .bodyToMono(type)
                        .transform(subroutine)
                )
                .filter(rsp -> rsp.success() && rsp.results > 0) //some common filters
                .collectList()
                .block();
    }

相关问题