嗨,我有个很重要的问题。我尝试使用reactive r2dbc创建一个批处理,并使用transactional对方法进行注解。但如果我同时使用事务性代码和批处理代码,代码就会挂起,无法工作。下面是代码
@Transactional /***Transaction***/
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
long startTime = System.currentTimeMillis();
Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch() /***Creating batch***/
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
//.execute())); /***Execution batch***/
.execute())).as(StepVerifier::create)
.expectNextCount(3) /***Expect count batch***/
.verifyComplete(); /***Verify batch***/
LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
return null;
}
2条答案
按热度按时间weylhg0b1#
你的问题是:
在React式应用程序中,应该返回mono/flux,即使流中没有任何项,也应该返回
Mono.emtpy
相反。检查我的示例插入多个记录。
并对试验结果进行逐步验证。
对于webflux应用程序中的事务支持,您必须阅读相关文档,以检查它是否支持一般本地事务或使用它时的限制。
如果事务得到很好的支持,有两种方法可以使用它。
注入
TransactionalOperator
(与传统的TransactionTemplate
) Package 您的业务逻辑。应用
@Transaction
类或方法的注解。jei2mxaa2#
你正在打破React链。
在React式编程中,在订阅之前什么都不会发生。
那是什么意思,我可以用一个小例子来说明。
而:
将打印:
如果你有一个函数,这也适用
您需要做的是:
发生了什么?在React式编程中,一旦有人订阅mono或flux,程序就会遍历并构建回调链,直到找到开始产生值的生产者(在我的例子中是just语句)。这个阶段称为“装配阶段”。当这个阶段完成后,React链将开始为订阅的任何人生成值。
如果没有人订阅,就不会建立链。
那么谁是订户呢?它通常是价值的最终消费者。因此,例如,发起呼叫的网页或移动应用程序,但如果是发起呼叫的网页(例如cron作业),也可以是您的spring引导服务。
让我们看看你的代码:
那你怎么解决呢?
你不需要破坏React链。这是基本的React式编程。
“我不想退货”
这就是
Mono#then
声明用于。当链中的每个部分都完成时,它会发出完成的信号,然后将值从一个部分传递到下一个部分,然后再次发出信号,然后传递值等等then
声明它只会发出完成的信号,而不返回任何内容(或者实际上它正在返回一个Mono<Void>
因为在React链中不允许null)。你必须始终返回,这样每一步都可以传递它的完整信号。我还删除了您在代码中使用的stepverifier,因为它通常用于验证单元测试中的步骤,而不用于生产代码。你可以在这里了解更多。
如果你想学习React式编程,我建议你这样做,因为它是惊人的,我喜欢它,我强烈建议你阅读优秀的React堆文档介绍React式编程,他们将解释什么都不会发生的概念,直到你订阅等。