SpringCloud Alibaba Seata处理分布式事务与执行原理

x33g5p2x  于2021-11-22 转载在 Spring  
字(11.0k)|赞(0)|评价(0)|浏览(646)

Seata处理分布式事务

分布式问题的出现

出现分不是之前 :单机单库没这个问题
出现分布式之后

我们分开的模块,原来模块都有独立的数据源,那么我如何保证一致性呢?
一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题
这个时候就需要一套解决方案,那么 seata 营运而生

Seata简介

Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务
官网地址 :http://seata.io/zh-cn/
下载地址 :https://github.com/seata/seata/releases

Seata能给我们带来什么

先介绍一下 seata 的三个核心组件和一个全局id

  • 全局唯一的事务ID 当开启事务就会生成xid,凭借这个id来标识是哪个事务

三个组件

  • Transaction Coordinator(TC) :事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;
  • Transaction Manager™ : 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;
  • Resource Manager(RM) : 控制分支事务,负责分支注册,状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚;

举个例子 : 一个典型的分布式事务过程

  • TM向TC申请开启一个全局事务,
  • 全局事务创建成功并生成一个全局唯一的XID;XID在微服务调用链路的上下文中传播;
  • RM向TC注册分支事务,将其纳入XID对应全局事务的管辖;
  • TM向TC发起针对XID的全局提交或回滚决议;
  • TC调度XID下管辖的全部分支事务完成提交或回滚请求。

seata 安装和配置

我们首先需要有的环境

  • java 8
  • mysql
    演示版本为 0.9 版本, 1.0之后有变化,有升级需求查看官方文档升级

下载完成之后,进入到config文件夹中

我们可以创建副本吗,避免修改玩坏了,
这里我们配置 file 先
找到 service 模块 自定义名字
vgroup_mapping.fsp_tx_group = “default”

store模块

mode = “db”
url = “jdbc:mysql://127.0.0.1:3306/seata”
user = “root”
password = “你自己的密码”

配置玩之后,我们去配置 registry.conf,配置自己nacos的相关信息

registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “nacos”
nacos {
serverAddr = “localhost:8848”
namespace = “”
cluster = “default”
}

之后去 数据库中 导入 seata的数据库脚本

启动即可

演示示例

准备环境

创建三个库

  • seata_order: 存储订单的数据库
  • seata_storage:存储库存的数据库
  • seata_account: 存储账户信息的数据库

建库sql

  1. CREATE DATABASE seata_order
  2. CREATE DATABASE seata_storage
  3. CREATE DATABASE seata_account

seata_order库下建t_order表

  1. CREATE TABLE t_order(
  2. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  3. `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  4. `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
  5. `count` INT(11) DEFAULT NULL COMMENT '数量',
  6. `money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
  7. `status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中; 1:已完结'
  8. ) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
  9. SELECT * FROM t_order;

seata_storage库下建t_storage表

  1. CREATE TABLE t_storage(
  2. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  3. `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
  4. `total` INT(11) DEFAULT NULL COMMENT '总库存',
  5. `used` INT(11) DEFAULT NULL COMMENT '已用库存',
  6. `residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
  7. ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
  8. INSERT INTO seata_storage.t_storage(`id`,`product_id`,`total`,`used`,`residue`)
  9. VALUES('1','1','100','0','100');
  10. SELECT * FROM t_storage;

seata_account库下建t_account表

  1. CREATE TABLE t_account(
  2. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
  3. `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  4. `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
  5. `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
  6. `residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
  7. ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
  8. INSERT INTO seata_account.t_account(`id`,`user_id`,`total`,`used`,`residue`) VALUES('1','1','1000','0','1000')
  9. SELECT * FROM t_account;

之后在三个数据库 创建 undo_log表

  1. drop table `undo_log`;
  2. CREATE TABLE `undo_log` (
  3. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  4. `branch_id` bigint(20) NOT NULL,
  5. `xid` varchar(100) NOT NULL,
  6. `context` varchar(128) NOT NULL,
  7. `rollback_info` longblob NOT NULL,
  8. `log_status` int(11) NOT NULL,
  9. `log_created` datetime NOT NULL,
  10. `log_modified` datetime NOT NULL,
  11. `ext` varchar(100) DEFAULT NULL,
  12. PRIMARY KEY (`id`),
  13. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  14. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

最终效果

之后构建三个模块

新建订单Order-Module

引入依赖

  1. <dependencies>
  2. <!--nacos-->
  3. <dependency>
  4. <groupId>com.alibaba.cloud</groupId>
  5. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  6. </dependency>
  7. <!--seata-->
  8. <dependency>
  9. <groupId>com.alibaba.cloud</groupId>
  10. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  11. <exclusions>
  12. <exclusion>
  13. <artifactId>seata-all</artifactId>
  14. <groupId>io.seata</groupId>
  15. </exclusion>
  16. </exclusions>
  17. </dependency>
  18. <dependency>
  19. <groupId>io.seata</groupId>
  20. <artifactId>seata-all</artifactId>
  21. <version>0.9.0</version>
  22. </dependency>
  23. <!--feign-->
  24. <dependency>
  25. <groupId>org.springframework.cloud</groupId>
  26. <artifactId>spring-cloud-starter-openfeign</artifactId>
  27. </dependency>
  28. <!--web-actuator-->
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-web</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-starter-actuator</artifactId>
  36. </dependency>
  37. <!--mysql-druid-->
  38. <dependency>
  39. <groupId>mysql</groupId>
  40. <artifactId>mysql-connector-java</artifactId>
  41. <version>5.1.37</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>com.alibaba</groupId>
  45. <artifactId>druid-spring-boot-starter</artifactId>
  46. <version>1.1.10</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.mybatis.spring.boot</groupId>
  50. <artifactId>mybatis-spring-boot-starter</artifactId>
  51. <version>2.0.0</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.springframework.boot</groupId>
  55. <artifactId>spring-boot-starter-test</artifactId>
  56. <scope>test</scope>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.projectlombok</groupId>
  60. <artifactId>lombok</artifactId>
  61. <optional>true</optional>
  62. </dependency>
  63. </dependencies>

配置文件

  1. server:
  2. port: 2001
  3. spring:
  4. application:
  5. name: seata-order-service
  6. cloud:
  7. alibaba:
  8. seata:
  9. #自定义事务组名称需要与seata-server中的对应
  10. tx-service-group: fsp_tx_group
  11. nacos:
  12. discovery:
  13. server-addr: localhost:8848
  14. datasource:
  15. driver-class-name: com.mysql.jdbc.Driver
  16. url: jdbc:mysql://localhost:3306/seata_order
  17. username: root
  18. password: 1111111
  19. feign:
  20. hystrix:
  21. enabled: false
  22. logging:
  23. level:
  24. io:
  25. seata: info
  26. mybatis:
  27. mapperLocations: classpath:mapper/*.xml

之后将 .file.confregistry.conf两个文件放入到项目resource文件夹中
创建实体类
这里业务类我们使用的是 openfeign来作为调用的组件

  • OrderService
  1. public interface OrderService{
  2. void create(Order order);
  3. }

实现类
这里我们使用
@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)开启全局注解

name是我们的事务名字
之后回掉方法是异常class

  1. @Service
  2. @Slf4j
  3. public class OrderServiceImpl implements OrderService
  4. {
  5. @Resource
  6. private OrderDao orderDao;
  7. @Resource
  8. private StorageService storageService;
  9. @Resource
  10. private AccountService accountService;
  11. /** * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态 */
  12. @Override
  13. @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
  14. public void create(Order order){
  15. log.info("----->开始新建订单");
  16. //新建订单
  17. orderDao.create(order);
  18. //扣减库存
  19. log.info("----->订单微服务开始调用库存,做扣减Count");
  20. storageService.decrease(order.getProductId(),order.getCount());
  21. log.info("----->订单微服务开始调用库存,做扣减end");
  22. //扣减账户
  23. log.info("----->订单微服务开始调用账户,做扣减Money");
  24. accountService.decrease(order.getUserId(),order.getMoney());
  25. log.info("----->订单微服务开始调用账户,做扣减end");
  26. //修改订单状态,从零到1代表已经完成
  27. log.info("----->修改订单状态开始");
  28. orderDao.update(order.getUserId(),0);
  29. log.info("----->修改订单状态结束");
  30. log.info("----->下订单结束了");
  31. }
  32. }
  • StorageService
  1. @FeignClient(value = "seata-storage-service")
  2. public interface StorageService{
  3. @PostMapping(value = "/storage/decrease")
  4. CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
  5. }
  • AccountService
  1. @FeignClient(value = "seata-account-service")
  2. public interface AccountService{
  3. @PostMapping(value = "/account/decrease")
  4. CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
  5. }
  • MyBatisConfig
  1. @Configuration
  2. @MapperScan({"com.atguigu.springcloud.alibaba.dao"})
  3. public class MyBatisConfig {
  4. }
  • DataSourceProxyConfig(配置seata的代理,)
  1. @Configuration
  2. public class DataSourceProxyConfig {
  3. @Value("${mybatis.mapperLocations}")
  4. private String mapperLocations;
  5. @Bean
  6. @ConfigurationProperties(prefix = "spring.datasource")
  7. public DataSource druidDataSource(){
  8. return new DruidDataSource();
  9. }
  10. @Bean
  11. public DataSourceProxy dataSourceProxy(DataSource dataSource) {
  12. return new DataSourceProxy(dataSource);
  13. }
  14. @Bean
  15. public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
  16. SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
  17. sqlSessionFactoryBean.setDataSource(dataSourceProxy);
  18. sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
  19. sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
  20. return sqlSessionFactoryBean.getObject();
  21. }
  22. }

新建库存Storage-Module

pom基本一致,就不重复复制了,直接上配置

  1. server:
  2. port: 2002
  3. spring:
  4. application:
  5. name: seata-storage-service
  6. cloud:
  7. alibaba:
  8. seata:
  9. tx-service-group: fsp_tx_group
  10. nacos:
  11. discovery:
  12. server-addr: localhost:8848
  13. datasource:
  14. driver-class-name: com.mysql.jdbc.Driver
  15. url: jdbc:mysql://localhost:3306/seata_storage
  16. username: root
  17. password: 111111
  18. logging:
  19. level:
  20. io:
  21. seata: info
  22. mybatis:
  23. mapperLocations: classpath:mapper/*.xml

之后将 .file.confregistry.conf两个文件放入到项目resource文件夹中
这里就略过实体类了,直接实现类,主要感受seata带来的事务

  1. @Service
  2. public class StorageServiceImpl implements StorageService {
  3. private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
  4. @Resource
  5. private StorageDao storageDao;
  6. // 扣减库存
  7. @Override
  8. public void decrease(Long productId, Integer count) {
  9. LOGGER.info("------->storage-service中扣减库存开始");
  10. storageDao.decrease(productId,count);
  11. LOGGER.info("------->storage-service中扣减库存结束");
  12. }
  13. }

如下两个配置与模块一一致

  • MyBatisConfig
  • DataSourceProxyConfig

新建账户Account-Module

配置文件

  1. server:
  2. port: 2003
  3. spring:
  4. application:
  5. name: seata-account-service
  6. cloud:
  7. alibaba:
  8. seata:
  9. tx-service-group: fsp_tx_group
  10. nacos:
  11. discovery:
  12. server-addr: localhost:8848
  13. datasource:
  14. driver-class-name: com.mysql.jdbc.Driver
  15. url: jdbc:mysql://localhost:3306/seata_account
  16. username: root
  17. password: 1111111
  18. feign:
  19. hystrix:
  20. enabled: false
  21. logging:
  22. level:
  23. io:
  24. seata: info
  25. mybatis:
  26. mapperLocations: classpath:mapper/*.xml

之后将 .file.confregistry.conf两个文件放入到项目resource文件夹中

实现类

  1. /** * 账户业务实现类 */
  2. @Service
  3. public class AccountServiceImpl implements AccountService {
  4. private static final Logger LOGGER = LoggerFactory.getLogger(AccountServiceImpl.class);
  5. @Resource
  6. AccountDao accountDao;
  7. /** * 扣减账户余额 */
  8. @Override
  9. public void decrease(Long userId, BigDecimal money) {
  10. LOGGER.info("------->account-service中扣减账户余额开始");
  11. try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
  12. accountDao.decrease(userId,money);
  13. LOGGER.info("------->account-service中扣减账户余额结束");
  14. }
  15. }

这里我们设置超时,之后去创建订单,看看是否成功

http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
我们会发现,数据库没变化,出现超时异常,立刻就回滚了

当我们把超时代码注释,再次创建订单,数据库的值就变化了

只是配置了一下,添加了个全局事务注解,就可以实现分布式事务了,这其中seata做了什么呢?

seata原理

分布式事务的执行流程

  • TM开启分布式事务(TM向TC注册全局事务记录)
  • 换业务场景,编排数据库,服务等事务内资源(RM向TC汇报资源准备状态)
  • TM结束分布式事务,事务一阶段结束(TM通知TC提交/回滚分布式事务)
  • TC汇总事务信息,决定分布式事务是提交还是回滚
  • TC通知所有RM提交/回滚资源,事务二阶段结束。

seata的几种模式

AT模式如何做到对业务的无侵入

一阶段

在一阶段,Seata 会拦截“业务SQL”,

  1. 解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image’
  2. 执行“业务SQL”更新业务数据,在业务数据更新之后,
  3. 其保存成“after image”,最后生成行锁。

以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

二阶段

提交

回滚

二阶段如果是回滚的话,Seata就需要回滚一阶段已经执行的“业务SQL”,还原业务数据。
回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和“after image"如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

完整过程流程图

总结

seata分布式事务解决方案,我们微服务的事务发生了质变,原本只有单机和单数据源,开启事务
而多个数据源多个模块的事务开启,seata回去寻找在一个事务里的数据源,拦截sql去做一系列的处理
解决了单机单数据源一致性的业务痛点。 一个字 猛!

相关文章