java.lang.IllegalStateException: block()/blockFirst()/blockLast()正在阻塞,线程React器-http-nio中不支持此操作

5gfr0r5j  于 2023-01-07  发布在  Java
关注(0)|答案(1)|浏览(598)

我有一个Spring Webflux项目的问题。首先我做了两个Pojo模型类,User和Post。

    • 用户. java**
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder 
@Table("blog_user") 
public class User {
     
    @Id
    @Column("user_id")
    private Long id;
    
    @Column
    private String username;
 
    @Column
    @JsonIgnore
    private String password;
}
    • Java后**
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table
public class Post {
 
    @Id
    @Column("post_id")
    private Long id;
    
    @Column
    private String title;
  
    @Column
    private String body;
    
    @Column("user_id")
    private User user;
}

和相关存储库接口

public interface UserReactiveMysqlRepository extends ReactiveCrudRepository<User, Long> {
}

public interface PostReactiveMysqlRepository extends ReactiveCrudRepository<Post, Long> {
}

我创建了如下的webflux处理程序类,它尝试从Mono中提取用户,并将用户值放入post类。

@Component
public class PostHandler {
 
    @Autowired
    private PostReactiveMysqlRepository postRepository;
    
    @Autowired
    private UserReactiveMysqlRepository userRepository;

    public Mono<ServerResponse> findAll(ServerRequest request) {
        Flux<Post> fluxPost = postRepository.findAll()
                .filter(p -> (p.getUser() == null))
                .map(p -> {
                    User u = userRepository.findById(p.getId()).block();  // This line throws Exception.
                    p.setUser(u);
                    return p;
                });
    
        return ServerResponse.ok().body(fluxPost, Post.class);
    }

但是block()api行抛出了错误消息。

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ Handler com.aaa.blog.wf.router.BlogWebFluxEndpointRouter$$Lambda$1434/0x00000008006ce410@7d628cef [DispatcherHandler]
    *__checkpoint ⇢ HTTP GET "/route/post/all" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Mono.block(Mono.java:1710) ~[reactor-core-3.5.1.jar:3.5.1]
        at com.aaa.blog.wf.handler.PostHandler.lambda$1(PostHandler.java:47) ~[classes/:na]
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:479) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:296) ~[spring-web-6.0.3.jar:6.0.3]
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:254) ~[reactor-netty-core-1.1.1.jar:1.1.1]
        at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.5.1.jar:3.5.1]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358) ~[spring-web-6.0.3.jar:6.0.3]
        at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Flux.subscribe(Flux.java:8660) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102) ~[reactor-netty-core-1.1.1.jar:1.1.1]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) ~[reactor-netty-core-1.1.1.jar:1.1.1]
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

如你所知,Spring WebFlux的默认web服务器是netty。而且netty似乎不支持block()api函数。有些回复建议将web服务器更改为tomcat,但我认为这不是正确的解决方案。我如何从Mono中提取用户类值?或者我的代码包含错误的语法?任何回复都将是非常感谢的。最好的问候。

xriantvc

xriantvc1#

React式计划程序上不允许阻塞代码。您需要使用React式API“map/flatMap”定义流,并从控制器返回该流。
我看到PostReactiveMysqlRepository是被动的,userRepository.findById()返回Mono<User>,您可以使用

public Mono<ServerResponse> findAll(ServerRequest request) {
    Flux<Post> fluxPost = postRepository.findAll()
            .filter(p -> (p.getUser() == null))
            .flatMap(p -> 
                    userRepository.findById(p.getId()
                            .map(user -> {
                                p.setUser(user);
                                return p;
                            })
            );

    return ServerResponse.ok().body(fluxPost, Post.class);
}

这里的要点是

  • 如果操作是异步的,则使用flatMap而不是map(返回MonoFlux
  • 使用map代替block完成无功流

相关问题