我有一个名为“creationchannel”的频道,它由mongomessagestore备份,如下所示:
@Bean
ChannelMessageStore messageStore() {
return new MongoDbChannelMessageStore(mongoDatabaseFactory);
}
@Bean
PollableChannel creationChannel(ChannelMessageStore messageStore) {
return MessageChannels.queue("creationChannel", messageStore, "create").get();
}
我想在这里的流中使用它,但是我想确定的是,如果“createorderhandler”工作正常,那么来自那里的消息将是只读的(同样适用于“updateorderhandler”,但是使用不同的通道)。
...some code here...
.<HeadersElement, OperationType>route(
payload -> route(payload),
spec -> spec
.transactional(transactionHandleMessageAdvice)
.subFlowMapping(
OperationType.New,
sf -> sf
.channel("creationChannel")
.log(Level.DEBUG, "Creation of a new order", Message::getPayload)
.transform(Mapper::mapCreate)
.handle(createOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
.subFlowMapping(
OperationType.Update,
sf -> sf
.channel("updateChannel")
.log(Level.DEBUG, "Update for existing order", Message::getPayload)
.transform(Mapper::mapUpdate)
.handle(updateOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
)
...some code here...
我尝试将“transactionhandlemessageadvice”配置为:
@Bean
TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
return new TransactionHandleMessageAdvice(transactionManager, new Properties());
}
但在处理程序异常失败后,仍将从数据库中删除消息。
也许我应该为子流配置poller,然后用mongotransactionmanager进行配置?
1条答案
按热度按时间dzjeubhm1#
也许我应该为子流配置poller,然后用mongotransactionmanager进行配置?
这是正确的假设。只要你有一条线在流动中移动(就像你的一样)
PollableChannel creationChannel
),则在将消息放入存储时提交当前事务。在当前线程中不会发生更多的事情,因此,在您开始的当前事务中也不会发生更多的事情.transactional(transactionHandleMessageAdvice)
.要使读取具有事务性,您确实必须配置
Poller
上.transform(Mapper::mapCreate)
终结点。因此,在您再次切换到不同的线程之前,来自该队列通道的每个轮询都将是事务性的。因为事务是绑定到
ThreadLocal
当调用堆栈返回到事务发起方时,它被提交或回滚。使用异步逻辑,我们只想从生产者端“发送并忘记”,让使用者在数据准备就绪时处理数据。这不是设计事务的目的。