kafka中的事务同步

sr4lhrrt  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(497)

我要将kafka事务与存储库事务同步:

@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}

合并后(https://github.com/spring-projects/spring-kafka/issues/373)医生说这是可能的。不过,我在理解和实现这个特性方面还存在一些问题。请看中的示例https://docs.spring.io/spring-kafka/reference/html/#transaction-同步我必须创建一个messagelistenercontainer来监听我自己的事件。我还必须使用kafkatemplate发送我的事件吗?messagelistenercontainer是否禁止发送到代理?
如果我正确理解了kafkatemplate和kafkatransactionmanager,那么我必须使用相同的producerfactory,在该producerfactory中,我必须启用事务设置transactionidprefix。在我的示例中,我必须将messagelistenercontainer的transactionmanager设置为datasourcetransactionmanager。对吗?
从我的Angular 来看,我通过kafkatemplate发送一个事件,听我自己的事件,然后再次使用kafkatemplate转发事件,这看起来很奇怪。
如果我能得到一个简单的kafka事务与存储库事务同步的例子和一个解释,我会很有帮助。

unftdfkk

unftdfkk1#

如果侦听器容器设置了 KafkaTransactionManager ,容器将创建一个生产者,该生产者将由任何下游Kafka模板使用,并且容器将为您发送事务的偏移量。
如果容器有其他事务管理器,则容器无法发送偏移量,因为它无权访问生产者(或模板)。
另一个解决方案是用 @Transactional (使用datasource tm)并使用kafka tm配置容器。
这样,您的db tx将在线程返回到容器之前提交,容器随后将向kafka事务发送偏移量并提交它。
有关示例,请参见框架测试用例。

jw5wzhpr

jw5wzhpr2#

@eike behrends要有一个db+kafka事务,可以使用 ChainedTransactionManager 并这样定义:

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
    KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}

@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}

@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                           KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}

您需要注解事务db+kafka方法 @Transactional("chainedTransactionManager") (您可以在spring kafka项目上看到:https://github.com/spring-projects/spring-kafka/issues/433 )
你说:
从我的Angular 来看,我通过kafkatemplate发送一个事件,听我自己的事件,然后再次使用kafkatemplate转发事件,这看起来很奇怪。
你试过这个吗?如果是的话,你能举个例子吗?

相关问题