r2dbc连接关闭异常

oxcyiej7  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(1276)

请帮助我理解并解决当我尝试获取select sql语句在mssqlserver db上使用r2dbc获取的许多记录中的第一条记录时出现错误的原因。
我的gradle依赖项:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.security:spring-security-oauth2-client'
    implementation 'io.springfox:springfox-boot-starter:3.0.0'
    implementation 'org.springframework.data:spring-data-r2dbc'
    implementation 'io.r2dbc:r2dbc-mssql'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    annotationProcessor 'org.projectlombok:lombok'
}

我的代码(敏感内容用x屏蔽):[请理解,这不是生产准备好的代码,是非常初级阶段的poc的一部分。所以,大多数代码都不符合生产就绪标准。所以,请容忍这一点。]

@Configuration
@EnableR2dbcRepositories(basePackages="com.xxxx.admin.db.repo")
public class DatabaseConfiguration extends AbstractR2dbcConfiguration
{

    @Autowired
    private Environment env;

    @Bean
    @Override
    public ConnectionFactory connectionFactory() {
        return new MssqlConnectionFactory(MssqlConnectionConfiguration.builder().host("xxxxxx")
                .database("xxxx").username(env.getProperty("xxxxx")).password(env.getProperty("xxxxxx")).build());
    }

}

@RestController
@RequestMapping("/api/v1/admin")
public class AdminController  {
    @GetMapping("/junk")
    public Mono<Map<String, Object>> getUser() {
        return tmoUserRepo.getUser("XXXXXXXXXXX");      
    }
}

@Repository
public class TmoUserRepo {

private static final String GET_USER_SQL = "SELECT xxxxxxx WHERE ID=:userId ORDER BY XXX DESC, YYY ASC, ZZZ ASC;" ; 

    @Autowired
    private DatabaseClient dbClient;
    public Mono<Map<String, Object>> getUser(String userId) {
        dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
        return null;
    }
}

情景一。这会产生错误(dbclient是databaseclient的示例):

dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);

场景2。这成功地通过了:

dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().all(System.out::println);

=========================
场景1发生的情况:以下内容在日志中打印两次:

2020-12-04 18:37:38.569 DEBUG 18636 --- [actor-tcp-nio-1] o.s.d.r2dbc.core.DefaultDatabaseClient   : Executing SQL statement [SELECT usr.name_first AS namefirst,    
    usr.name_last AS namelast,
<<.... rest of SQL...>>

错误消息:

2020-12-04 18:37:40.278 ERROR 18636 --- [actor-tcp-nio-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
    at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2020-12-04 18:37:40.280 ERROR 18636 --- [actor-tcp-nio-1] r.n.channel.ChannelOperationsHandler     : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] Error was received while reading the incoming data. The connection will be closed.

reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
    at reactor.core.Exceptions.bubble(Exceptions.java:173) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:635) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:95) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:151) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:406) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:196) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:247) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
    at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    ... 32 common frames omitted

2020-12-04 18:37:40.280  WARN 18636 --- [actor-tcp-nio-1] reactor.netty.channel.FluxReceive        : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] An exception has been observed post termination, use DEBUG level to see the full stack: reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
iqih9akk

iqih9akk1#

简短的回答

你的问题可能是:

dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
return null;

您可能需要将此更改为:

return dbClient.execute(GET_USER_SQL)
               .bind("userId",userId)
               .fetch()
               .first();

冗长的回答

进行React式编程与常规java编程有很大的不同,如果您尝试进行常规java编程,您会遇到困难。
首先, null 在React式编程中不允许有值,所以永远不要返回 null .
React式编程与生产者和消费者一起工作。您的应用程序是生产者,而呼叫客户端(web应用程序、移动应用程序、 Postman 、curl等)是消费者。发起呼叫的人通常是消费者。使用者从订阅生产者开始,因此客户端订阅服务器。
当这种情况发生时,服务器将从代码的末尾开始遍历,直到找到产生值的内容(数据库)。在这个过程中,它将组装一个回调链。这一部分称为装配阶段。完成此阶段后,应用程序将输出一个(如果是 Mono )或者很多(如果是 Flux )价值观。
所以在有人订阅之前什么都不会发生。

// Only a declaration, nothing happens
Mono.just("Foobar");

而:

// Someone subscribes, chain is built and value is produced
Mono.just("Foobar").subscribe(s -> System.out.println(s));

这也适用于以下功能:

public Mono<String> getFoobar() {
    return Mono.just("Foobar");
}

getFoobar().subscribe(s -> System.out.println(s));

但如果你不想退货怎么办:

public Mono<Void> getFoobar() {
    return Mono.just("Foobar").then(); 
    // then will throw away whatever is returned from the previous and instead return a Mono<Void> that will signal to the next part in the chain that it is done.
}

getFoobar().subscribe();

或者你可以退回 Mono.empty() 这将转化为 Mono<Void> ```
public Mono getFoobar() {
return Mono.just("Foobar").flatMap(s -> {
// Return an empty, which will translate to a Mono
return Mono.empty();
});
}

getFoobar().subscribe();

如果我们看看你的两个场景:

dbClient.execute(GET_USER_SQL)
.bind("userId",userId)
.fetch()
.first()
.subscribe(System.out::println);

在这里你订阅,所以它的工作,但然后调用客户端想要订阅,它打破。

dbClient.execute(GET_USER_SQL)
.bind("userId",userId)
.fetch()
.first()
.all(System.out::println);

这是可行的,但取决于是否返回,值将被发送到调用客户端。这个 `all` 需要删除,以便返回从中生成的值 `first` 一直到客户。
我已经尽我所能解释了这一点,所有这些都可以在优秀的React堆文档中阅读

相关问题