java block()/blockFirst()/blockLast()是在交换()之后调用bodyToMono时发生的阻塞错误

zpgglvta  于 2023-02-02  发布在  Java
关注(0)|答案(6)|浏览(243)

我尝试使用Webflux将生成的文件流传输到另一个位置,但是,如果文件的生成遇到错误,api会返回成功,但会使用DTO详细说明生成文件时的错误,而不是文件本身。这是使用一个非常旧且设计糟糕的api,所以请原谅使用post和api设计。
api调用(exchange())的响应是ClientResponse。在这里,我可以使用bodyToMono转换为ByteArrayResource,它可以流传输到文件中,或者,如果创建文件时出错,我也可以使用bodyToMono转换为DTO。但是,我似乎无法根据ClientResponse头的内容执行或。
在运行时,我得到一个由以下原因引起的非法状态异常
block()/blockFirst()/blockLast()是阻塞的,线程React器-http-client-epoll-12不支持这种情况
我认为我的问题是我不能在同一个函数链中调用block()两次。
我的代码片段如下所示:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

基本上,我希望根据标头中定义的MediaType以不同的方式处理ClientResponse。
这可能吗?

gmol1639

gmol16391#

首先,有几件事可以帮助您理解解决这个用例的代码片段。
1.绝不应在返回React类型的方法内调用阻塞方法;您将阻塞应用程序的少数线程之一,这对应用程序非常不利
1.无论如何,从React器3.2 blocking within a reactive pipeline throws an error开始
1.正如注解中所建议的那样,调用subscribe也不是一个好主意,它或多或少地像在单独的线程中作为一个任务启动该作业,当它完成时,您将得到一个回调(subscribe方法可以用lambdas来表示),但实际上您是在将当前管道与该任务解耦。在您有机会读取完整的响应正文并将其写入文件之前,客户端HTTP响应可能已关闭,资源可能已清理
1.如果您不想在内存中缓冲整个响应,Spring提供了DataBuffer(想想可以池化的ByteBuffer示例)。
1.如果您实现的方法本身是阻塞的(例如返回void),则可以调用block,例如在测试用例中。
下面是一个代码片段,您可以使用它来完成此操作:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

正如您所看到的,我们没有在任何地方阻塞,处理I/O的方法返回Mono<Void>,这是done(error)回调的React性等价物,它在事情完成时发出信号,如果发生错误。
因为我不确定createErrorFile方法应该做什么,所以我提供了一个createSpreadsheet的示例,它只将主体字节写入文件。注意,因为数据缓冲区可能会被回收/池化,所以我们需要在完成后释放它们。

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

通过这种实现,您的应用程序将在给定时间在内存中保存一些DataBuffer示例(出于性能原因,React式操作符将预取值),并将以React式方式写入字节。

t5fffqht

t5fffqht2#

[2021年10月19日更新]

toProcessor()现已弃用。
考虑使用

myMono.toFuture().get();

正如投票最多的答案所指出的,永远不要阻塞。在我的例子中,这是唯一的选择,因为我们在一段命令式代码中使用了一个React式库。阻塞可以通过在处理器中 Package mono来完成:

myMono.toProcessor().block()
5fjcxozz

5fjcxozz3#

要在服务器请求池之外执行客户端请求,请使用myWebClientMono.share().block();

7uhlpewt

7uhlpewt4#

尝试myMono.subscribeOn(Schedulers.boundedElastic()).toFuture().get(5L, TimeUnit.SECONDS)

lokaqttq

lokaqttq5#

[更新日期:2023年1月31日]

我想补充一下这个主题,并分享我的解决方案,因为exchange()操作符从5.3版本开始就被弃用了。
详情:
由于可能泄漏内存和/或连接,自5.3起弃用;请使用exchangeToMono(函数)、exchangeToFlux(函数);还可以考虑使用retrieve(),它通过ResponseEntity提供对响应状态和报头的访问以及错误状态处理。
因此,我将使用retrieve()操作符给出此任务的一个示例,并使用 * streaming * 方法以某种方式简化将文件保存到文件系统的过程。
因为它给了我们访问头部和响应主体的机会,我们可以这样做:

Mono<Void> fileWritten = webClient.get()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .retrieve()                         // using retrieve since exchange() is deprecated
        .toEntityFlux(DataBuffer.class)     // return Mono<ResponseEntity<Flux<DataBuffer>>>
        .flatMap(entity -> {
            // here we can access headers, body and etc. since we have ResponseEntity here
            if (MediaType.APPLICATION_JSON_VALUE.equals(entity.getHeaders().getContentType().toString())) {
                return createFile(entity.getBody(), "no_file_payload_response"); // save no payload body to a file
            } else {
                return createFile(entity.getBody(), "file"); // save file body to a file
            }
        });

fileWritten.subscribe(); // just for testing purposes, subscribe where you want depending on your requirements

将流Publisher<DataBuffer>保存到文件系统的方法:

private Mono<Void> createFile(Publisher<DataBuffer> body, String fileName) {
    Path path = Path.of("your_desired_path/" + fileName);
    return DataBufferUtils.write(body, path,
            StandardOpenOption.CREATE_NEW); // use OpenOption you want depending on your requirements
}

此外,如您所见,使用DataBufferUtils.write(),我们可以直接将流写入文件
这里我们不使用任何阻塞API,如Input/OutputStream,因此我们不会同时在内存中缓冲文件的整个内容。

ubof19bj

ubof19bj6#

RestResultMessage message= createWebClient()
                .get()
                .uri(uri)
                .exchange()
                .map(clientResponse -> {
                    //delegation
                    ClientResponseWrapper wrapper = new 
                                 ClientResponseWrapper(clientResponse);
                    return Mono.just(wrapper);
                })
                .block() //wait until request is not done
                .map(result -> {  
                    //convert to any data
                    if (!result.statusCode().isError()){
                       //extract the result from request
                        return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());}
                    } else {
                        return create(RestResultMessage.Result.error, result.statusCode().name());
                    }
                })
                .block();

相关问题