spring集成事务

5lhxktic  于 2021-06-27  发布在  Java
关注(0)|答案(1)|浏览(302)

我有一个名为“creationchannel”的频道,它由mongomessagestore备份,如下所示:

@Bean
  ChannelMessageStore messageStore() {
    return new MongoDbChannelMessageStore(mongoDatabaseFactory);
  }

  @Bean
  PollableChannel creationChannel(ChannelMessageStore messageStore) {
    return MessageChannels.queue("creationChannel", messageStore, "create").get();
  }

我想在这里的流中使用它,但是我想确定的是,如果“createorderhandler”工作正常,那么来自那里的消息将是只读的(同样适用于“updateorderhandler”,但是使用不同的通道)。

...some code here...
        .<HeadersElement, OperationType>route(
            payload -> route(payload),
            spec -> spec
                .transactional(transactionHandleMessageAdvice)
                .subFlowMapping(
                    OperationType.New,
                    sf -> sf
                        .channel("creationChannel")
                        .log(Level.DEBUG, "Creation of a new order", Message::getPayload)
                        .transform(Mapper::mapCreate)
                        .handle(createOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
                .subFlowMapping(
                    OperationType.Update,
                    sf -> sf
                        .channel("updateChannel")
                        .log(Level.DEBUG, "Update for existing order", Message::getPayload)
                        .transform(Mapper::mapUpdate)
                        .handle(updateOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
        )
...some code here...

我尝试将“transactionhandlemessageadvice”配置为:

@Bean
  TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
    return new TransactionHandleMessageAdvice(transactionManager, new Properties());
  }

但在处理程序异常失败后,仍将从数据库中删除消息。
也许我应该为子流配置poller,然后用mongotransactionmanager进行配置?

dzjeubhm

dzjeubhm1#

也许我应该为子流配置poller,然后用mongotransactionmanager进行配置?
这是正确的假设。只要你有一条线在流动中移动(就像你的一样) PollableChannel creationChannel ),则在将消息放入存储时提交当前事务。在当前线程中不会发生更多的事情,因此,在您开始的当前事务中也不会发生更多的事情 .transactional(transactionHandleMessageAdvice) .
要使读取具有事务性,您确实必须配置 Poller.transform(Mapper::mapCreate) 终结点。因此,在您再次切换到不同的线程之前,来自该队列通道的每个轮询都将是事务性的。
因为事务是绑定到 ThreadLocal 当调用堆栈返回到事务发起方时,它被提交或回滚。使用异步逻辑,我们只想从生产者端“发送并忘记”,让使用者在数据准备就绪时处理数据。这不是设计事务的目的。

相关问题