如何使用SpringWebClient进行同步调用?

qzlgjiam  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(444)

resttemplate文档中的spring有以下注解:
注意:从5.0开始,这个类处于维护模式,只接受少量的更改请求和bug。请考虑使用org.springframework.web.reactive.client.webclient,它有一个更现代的api,支持同步、异步和流式场景。
https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/client/resttemplate.html
当我尝试使用webclient并进行如下同步调用时:

@RestController
@RequestMapping("/rating")
public class Controller {

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private WebClient.Builder webClientBuilder;

    @RequestMapping("/{userId}")
    public UserRating getUserRating(@PathVariable("userId") String userId) {

//      return restTemplate.getForObject("http://localhost:8083/ratingsdata/user/" + userId, UserRating.class);
        return webClientBuilder.build()
                .get().uri("http://localhost:8083/ratingsdata/user/" + userId)
                .retrieve()
                .bodyToMono(UserRating.class)
                .block();

    }
}

pom.xml项目:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.daren.tutorial.movie</groupId>
    <artifactId>catalog</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>catalog</name>
    <description>Movie catalog</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

我得到以下错误:

2020-11-15 10:40:15.359 ERROR 7175 --- [or-http-epoll-2] a.w.r.e.AbstractErrorWebExceptionHandler : [8776f4a6-1]  500 Server Error for HTTP GET "/rating/15"

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0.jar:3.4.0]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ HTTP GET "/rating/15" [ExceptionHandlingWebHandler]
Stack trace:
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.block(Mono.java:1679) ~[reactor-core-3.4.0.jar:3.4.0]
        at com.daren.tutorial.movie.catalog.resources.Controller.getUserRating(Controller.java:39) ~[classes/:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:146) ~[spring-webflux-5.3.1.jar:5.3.1]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:99) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:191) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3972) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:611) ~[reactor-netty-http-1.0.1.jar:1.0.1]
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:612) ~[reactor-netty-core-1.0.1.jar:1.0.1]
        at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:453) ~[reactor-netty-core-1.0.1.jar:1.0.1]
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:510) ~[reactor-netty-http-1.0.1.jar:1.0.1]
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.1.jar:1.0.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:185) ~[reactor-netty-http-1.0.1.jar:1.0.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]
        at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

对restemplate对象使用注解行可以正常工作。我是否必须使用不推荐使用的restemplate来进行同步rest调用?

inn6fuwd

inn6fuwd1#

我找到了问题的原因并解决了它:
因为类路径上只有webflux,所以应用程序是在nettyweb服务器上启动的。此配置中的请求在reactor-http-epoll-2线程上处理,该线程不是reactor.core.scheduler.nonblocking的示例。它引起了非法国家的例外。
要修复它,我必须添加以下依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

应用程序现在默认在tomcat上启动,处理请求的线程http-nio-8081-exec-1是非阻塞的示例。

相关问题