我有三个不同的来源,我想发送信息:
- Oracle数据库,与JpaRepository相关。
- MongoDB,与MongoRepository相关。
1.还有一个Kafka主题,它通过yml配置了生成该主题所需的所有数据。
我想将这些资源与Spring Tx同步。我有一个与Oracle db相关的JpaTransactionManager,另一个与MongoDB相关的MongoTransactionManager,以及Spring在激活事务时自动创建的KafkaTransactionManager。
我的想法是这样的:
1.首先,我需要一个基本的控制器来自动连接一个服务:
@RestController
public class SimpleController {
@Autowired
private SimpleService simpleService;
@PostMapping(...)
public Response doTx () {
simpleService.doTx();
}
1.然后,该服务将同时自动连接另外三个服务,每个服务对应一个资源:
@Service
public class SimpleService {
@Autowired
private OracleService oracleService;
@Autowired
private MongoService mongoService;
@Autowired
private KafkaService kafkaService;
...
}
1.这样的SimpleService将有一个用@Transactional annotation标注的方法,在其中调用这些服务:
@Service
public class SimpleService {
@Transactional
public void doTx () {
oracleService.doTx();
mongoService.doTx();
kafkaService.doTx();
}
}
1.而且,对这些方法的每个调用也将使用@Transactional annotation进行注解:
public class OracleService {
@Autowired
JpaRepository jpaRepository
@Transactional
public void doTx(){
jpaRepository.save();
}
}
我希望的是,在SimpleService中启动一个事务,我调用的每个其他服务,加入这样的事务,并在方法即将返回时一个接一个地提交。
但这不管用。它“工作”的唯一方式(我说“工作”是因为它不允许我完全自定义提交资源的顺序)是将所有资源放在一个服务中,所有资源都从同一个方法调用:
@Service
public class SimpleService {
@Autowired
private JpaRepository oracleJpaRepository;
@Autowired
private MongoRepository mongoRepository;
@Autowired
private KafkaTemplate kafkaTemplate;
@Transactional(transactionManager = "oracleJpaTransactionManager")
public void doTx() {
oracleJpaRepository.save();
mongoRepository.save();
kafkaTemplate.send();
}
因此,基本上,问题是,如果我的第一种方法是有效的方法(我认为它是有效的,因为它出现在文档中包含的一些Spring Kafka代码示例中)以及如何使其工作(没有被弃用的ChainTransactionManager
)。
所有的帮助是赞赏:D!
我尝试用上面的方法同步事务,但它只会在调用该方法时提交,而不会加入前一个事务。
编辑:我并不试图实现完全的交易性。只是同步,尽力而为1阶段提交。我知道在同步这些资源的时候可能会有一些数据不一致,补偿不是问题,关键是在方法的末尾链接提交。
编辑2:
Spring Kafka文档:https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync
@Transactional("dstm")
public void someMethod(String in) {
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
sendToKafka(in);
}
@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
}
下面的示例显示了如何嵌套事务以及它们是否同步。
1条答案
按热度按时间mwngjboj1#
因此,短期内,与多个数据库和Kafka进行事务同步是行不通的。
在其全部容量下的事务同步看起来只适用于已弃用的Spring事务类
ChainedTransactionManager
,它确实在方法的末尾同步了使用ChainedTransactionManager
中指定的事务管理器进行的所有操作。对Transactional注解方法的嵌套调用不会同步。
数据库和Kafka只有在 * 只有一个数据库和Kafka放在一起 * 的情况下才会同步,因为问题的注解指出一旦第二个数据库进入游戏,数据库就会中断。
您可以使用另一种模式(Transactional Outbox在这些情况下听起来很可靠,但它带有更高的数据不一致性时间框架),或者将ChainedTransactionManager类复制粘贴到您的库/示例中并使用它。
请注意,一旦提交失败,补偿将不得不手动完成,因为这不是XA或类似的。因此,如果您同步多个数据库和Kafka,请确保捕获启发式异常并适当补偿。
如果您了解使用
ChainedTransactionManager
的缺点,那么当您的应用程序需要将所有资源提交在一起而没有业务逻辑时,它是一个很好的工具来获得这种同步。不要期待两阶段提交,这是一个尽力而为的1阶段提交。所以当一次提交失败时,所有之前的提交都不会回滚,你必须手动补偿。