我有一个Spring Webflux项目的问题。首先我做了两个Pojo模型类,User和Post。
- 用户. java**
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table("blog_user")
public class User {
@Id
@Column("user_id")
private Long id;
@Column
private String username;
@Column
@JsonIgnore
private String password;
}
- Java后**
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table
public class Post {
@Id
@Column("post_id")
private Long id;
@Column
private String title;
@Column
private String body;
@Column("user_id")
private User user;
}
和相关存储库接口
public interface UserReactiveMysqlRepository extends ReactiveCrudRepository<User, Long> {
}
public interface PostReactiveMysqlRepository extends ReactiveCrudRepository<Post, Long> {
}
我创建了如下的webflux处理程序类,它尝试从Mono中提取用户,并将用户值放入post类。
@Component
public class PostHandler {
@Autowired
private PostReactiveMysqlRepository postRepository;
@Autowired
private UserReactiveMysqlRepository userRepository;
public Mono<ServerResponse> findAll(ServerRequest request) {
Flux<Post> fluxPost = postRepository.findAll()
.filter(p -> (p.getUser() == null))
.map(p -> {
User u = userRepository.findById(p.getId()).block(); // This line throws Exception.
p.setUser(u);
return p;
});
return ServerResponse.ok().body(fluxPost, Post.class);
}
但是block()api行抛出了错误消息。
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler com.aaa.blog.wf.router.BlogWebFluxEndpointRouter$$Lambda$1434/0x00000008006ce410@7d628cef [DispatcherHandler]
*__checkpoint ⇢ HTTP GET "/route/post/all" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Mono.block(Mono.java:1710) ~[reactor-core-3.5.1.jar:3.5.1]
at com.aaa.blog.wf.handler.PostHandler.lambda$1(PostHandler.java:47) ~[classes/:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:479) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:296) ~[spring-web-6.0.3.jar:6.0.3]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:254) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.5.1.jar:3.5.1]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358) ~[spring-web-6.0.3.jar:6.0.3]
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Flux.subscribe(Flux.java:8660) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
如你所知,Spring WebFlux的默认web服务器是netty。而且netty似乎不支持block()api函数。有些回复建议将web服务器更改为tomcat,但我认为这不是正确的解决方案。我如何从Mono中提取用户类值?或者我的代码包含错误的语法?任何回复都将是非常感谢的。最好的问候。
1条答案
按热度按时间xriantvc1#
React式计划程序上不允许阻塞代码。您需要使用React式API“map/flatMap”定义流,并从控制器返回该流。
我看到
PostReactiveMysqlRepository
是被动的,userRepository.findById()
返回Mono<User>
,您可以使用这里的要点是
flatMap
而不是map
(返回Mono
或Flux
)map
代替block
完成无功流