我要将kafka事务与存储库事务同步:
@Transactional
public void syncTransaction(){
myRepository.save(someObject)
kafkaTemplate.send(someEvent)
}
合并后(https://github.com/spring-projects/spring-kafka/issues/373)医生说这是可能的。不过,我在理解和实现这个特性方面还存在一些问题。请看中的示例https://docs.spring.io/spring-kafka/reference/html/#transaction-同步我必须创建一个messagelistenercontainer来监听我自己的事件。我还必须使用kafkatemplate发送我的事件吗?messagelistenercontainer是否禁止发送到代理?
如果我正确理解了kafkatemplate和kafkatransactionmanager,那么我必须使用相同的producerfactory,在该producerfactory中,我必须启用事务设置transactionidprefix。在我的示例中,我必须将messagelistenercontainer的transactionmanager设置为datasourcetransactionmanager。对吗?
从我的Angular 来看,我通过kafkatemplate发送一个事件,听我自己的事件,然后再次使用kafkatemplate转发事件,这看起来很奇怪。
如果我能得到一个简单的kafka事务与存储库事务同步的例子和一个解释,我会很有帮助。
2条答案
按热度按时间unftdfkk1#
如果侦听器容器设置了
KafkaTransactionManager
,容器将创建一个生产者,该生产者将由任何下游Kafka模板使用,并且容器将为您发送事务的偏移量。如果容器有其他事务管理器,则容器无法发送偏移量,因为它无权访问生产者(或模板)。
另一个解决方案是用
@Transactional
(使用datasource tm)并使用kafka tm配置容器。这样,您的db tx将在线程返回到容器之前提交,容器随后将向kafka事务发送偏移量并提交它。
有关示例,请参见框架测试用例。
jw5wzhpr2#
@eike behrends要有一个db+kafka事务,可以使用
ChainedTransactionManager
并这样定义:您需要注解事务db+kafka方法
@Transactional("chainedTransactionManager")
(您可以在spring kafka项目上看到:https://github.com/spring-projects/spring-kafka/issues/433 )你说:
从我的Angular 来看,我通过kafkatemplate发送一个事件,听我自己的事件,然后再次使用kafkatemplate转发事件,这看起来很奇怪。
你试过这个吗?如果是的话,你能举个例子吗?