SpringCloud Alibaba Seata处理分布式事务与执行原理

x33g5p2x  于2021-11-22 转载在 Spring  
字(11.0k)|赞(0)|评价(0)|浏览(623)

Seata处理分布式事务

分布式问题的出现

出现分不是之前 :单机单库没这个问题
出现分布式之后

我们分开的模块,原来模块都有独立的数据源,那么我如何保证一致性呢?
一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题
这个时候就需要一套解决方案,那么 seata 营运而生

Seata简介

Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务
官网地址 :http://seata.io/zh-cn/
下载地址 :https://github.com/seata/seata/releases

Seata能给我们带来什么

先介绍一下 seata 的三个核心组件和一个全局id

  • 全局唯一的事务ID 当开启事务就会生成xid,凭借这个id来标识是哪个事务

三个组件

  • Transaction Coordinator(TC) :事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;
  • Transaction Manager™ : 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;
  • Resource Manager(RM) : 控制分支事务,负责分支注册,状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚;

举个例子 : 一个典型的分布式事务过程

  • TM向TC申请开启一个全局事务,
  • 全局事务创建成功并生成一个全局唯一的XID;XID在微服务调用链路的上下文中传播;
  • RM向TC注册分支事务,将其纳入XID对应全局事务的管辖;
  • TM向TC发起针对XID的全局提交或回滚决议;
  • TC调度XID下管辖的全部分支事务完成提交或回滚请求。

seata 安装和配置

我们首先需要有的环境

  • java 8
  • mysql
    演示版本为 0.9 版本, 1.0之后有变化,有升级需求查看官方文档升级

下载完成之后,进入到config文件夹中

我们可以创建副本吗,避免修改玩坏了,
这里我们配置 file 先
找到 service 模块 自定义名字
vgroup_mapping.fsp_tx_group = “default”

store模块

mode = “db”
url = “jdbc:mysql://127.0.0.1:3306/seata”
user = “root”
password = “你自己的密码”

配置玩之后,我们去配置 registry.conf,配置自己nacos的相关信息

registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “nacos”
nacos {
serverAddr = “localhost:8848”
namespace = “”
cluster = “default”
}

之后去 数据库中 导入 seata的数据库脚本

启动即可

演示示例

准备环境

创建三个库

  • seata_order: 存储订单的数据库
  • seata_storage:存储库存的数据库
  • seata_account: 存储账户信息的数据库

建库sql

CREATE DATABASE seata_order;
 
CREATE DATABASE seata_storage;
 
CREATE DATABASE seata_account;

seata_order库下建t_order表

CREATE TABLE t_order(
    `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
    `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
    `count` INT(11) DEFAULT NULL COMMENT '数量',
    `money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
    `status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中; 1:已完结'
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
 
SELECT * FROM t_order;

seata_storage库下建t_storage表

CREATE TABLE t_storage(
    `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
    `total` INT(11) DEFAULT NULL COMMENT '总库存',
    `used` INT(11) DEFAULT NULL COMMENT '已用库存',
    `residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
 
INSERT INTO seata_storage.t_storage(`id`,`product_id`,`total`,`used`,`residue`)
VALUES('1','1','100','0','100');
 
 
SELECT * FROM t_storage;

seata_account库下建t_account表

CREATE TABLE t_account(
    `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
    `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
    `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
    `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
    `residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
 
INSERT INTO seata_account.t_account(`id`,`user_id`,`total`,`used`,`residue`) VALUES('1','1','1000','0','1000')
 
 
 
SELECT * FROM t_account;

之后在三个数据库 创建 undo_log表

drop table `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

最终效果

之后构建三个模块

新建订单Order-Module

引入依赖

<dependencies>
        <!--nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>seata-all</artifactId>
                    <groupId>io.seata</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>0.9.0</version>
        </dependency>
        <!--feign-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <!--web-actuator-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--mysql-druid-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

配置文件

server:
  port: 2001
 
spring:
  application:
    name: seata-order-service
  cloud:
    alibaba:
      seata:
        #自定义事务组名称需要与seata-server中的对应
        tx-service-group: fsp_tx_group
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_order
    username: root
    password: 1111111
 
feign:
  hystrix:
    enabled: false
 
logging:
  level:
    io:
      seata: info
 
mybatis:
  mapperLocations: classpath:mapper/*.xml

之后将 .file.confregistry.conf两个文件放入到项目resource文件夹中
创建实体类
这里业务类我们使用的是 openfeign来作为调用的组件

  • OrderService
public interface OrderService{
    void create(Order order);
}

实现类
这里我们使用
@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)开启全局注解

name是我们的事务名字
之后回掉方法是异常class

@Service
@Slf4j
public class OrderServiceImpl implements OrderService
{
    @Resource
    private OrderDao orderDao;
    @Resource
    private StorageService storageService;
    @Resource
    private AccountService accountService;
 
    /** * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态 */
     
    @Override
    @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
    public void create(Order order){
        log.info("----->开始新建订单");
        //新建订单
        orderDao.create(order);
 
        //扣减库存
        log.info("----->订单微服务开始调用库存,做扣减Count");
        storageService.decrease(order.getProductId(),order.getCount());
        log.info("----->订单微服务开始调用库存,做扣减end");
 
        //扣减账户
        log.info("----->订单微服务开始调用账户,做扣减Money");
        accountService.decrease(order.getUserId(),order.getMoney());
        log.info("----->订单微服务开始调用账户,做扣减end");
 
         
        //修改订单状态,从零到1代表已经完成
        log.info("----->修改订单状态开始");
        orderDao.update(order.getUserId(),0);
        log.info("----->修改订单状态结束");
 
        log.info("----->下订单结束了");
 
    }
}
  • StorageService
@FeignClient(value = "seata-storage-service")
public interface StorageService{
   @PostMapping(value = "/storage/decrease")
   CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
  • AccountService
@FeignClient(value = "seata-account-service")
public interface AccountService{
   @PostMapping(value = "/account/decrease")
   CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
  • MyBatisConfig
@Configuration
@MapperScan({"com.atguigu.springcloud.alibaba.dao"})
public class MyBatisConfig {
}
  • DataSourceProxyConfig(配置seata的代理,)
@Configuration
public class DataSourceProxyConfig {
 
    @Value("${mybatis.mapperLocations}")
    private String mapperLocations;
 
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        return new DruidDataSource();
    }
 
    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }
 
    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }
 
}

新建库存Storage-Module

pom基本一致,就不重复复制了,直接上配置

server:
  port: 2002
 
spring:
  application:
    name: seata-storage-service
  cloud:
    alibaba:
      seata:
        tx-service-group: fsp_tx_group
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_storage
    username: root
    password: 111111
 
logging:
  level:
    io:
      seata: info
 
mybatis:
  mapperLocations: classpath:mapper/*.xml

之后将 .file.confregistry.conf两个文件放入到项目resource文件夹中
这里就略过实体类了,直接实现类,主要感受seata带来的事务

@Service
public class StorageServiceImpl implements StorageService {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
 
    @Resource
    private StorageDao storageDao;
 
     // 扣减库存
    @Override
    public void decrease(Long productId, Integer count) {
        LOGGER.info("------->storage-service中扣减库存开始");
        storageDao.decrease(productId,count);
        LOGGER.info("------->storage-service中扣减库存结束");
    }
}

如下两个配置与模块一一致

  • MyBatisConfig
  • DataSourceProxyConfig

新建账户Account-Module

配置文件

server:
  port: 2003
 
spring:
  application:
    name: seata-account-service
  cloud:
    alibaba:
      seata:
        tx-service-group: fsp_tx_group
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_account
    username: root
    password: 1111111
 
feign:
  hystrix:
    enabled: false
 
logging:
  level:
    io:
      seata: info
 
mybatis:
  mapperLocations: classpath:mapper/*.xml

之后将 .file.confregistry.conf两个文件放入到项目resource文件夹中

实现类

/** * 账户业务实现类 */
@Service
public class AccountServiceImpl implements AccountService {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(AccountServiceImpl.class);
 
 
    @Resource
    AccountDao accountDao;
 
    /** * 扣减账户余额 */
    @Override
    public void decrease(Long userId, BigDecimal money) {
        
         LOGGER.info("------->account-service中扣减账户余额开始");
        try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
        accountDao.decrease(userId,money);
        LOGGER.info("------->account-service中扣减账户余额结束");
    }
}

这里我们设置超时,之后去创建订单,看看是否成功

http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
我们会发现,数据库没变化,出现超时异常,立刻就回滚了

当我们把超时代码注释,再次创建订单,数据库的值就变化了

只是配置了一下,添加了个全局事务注解,就可以实现分布式事务了,这其中seata做了什么呢?

seata原理

分布式事务的执行流程

  • TM开启分布式事务(TM向TC注册全局事务记录)
  • 换业务场景,编排数据库,服务等事务内资源(RM向TC汇报资源准备状态)
  • TM结束分布式事务,事务一阶段结束(TM通知TC提交/回滚分布式事务)
  • TC汇总事务信息,决定分布式事务是提交还是回滚
  • TC通知所有RM提交/回滚资源,事务二阶段结束。

seata的几种模式

AT模式如何做到对业务的无侵入

一阶段

在一阶段,Seata 会拦截“业务SQL”,

  1. 解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image’
  2. 执行“业务SQL”更新业务数据,在业务数据更新之后,
  3. 其保存成“after image”,最后生成行锁。

以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

二阶段

提交

回滚

二阶段如果是回滚的话,Seata就需要回滚一阶段已经执行的“业务SQL”,还原业务数据。
回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和“after image"如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

完整过程流程图

总结

seata分布式事务解决方案,我们微服务的事务发生了质变,原本只有单机和单数据源,开启事务
而多个数据源多个模块的事务开启,seata回去寻找在一个事务里的数据源,拦截sql去做一系列的处理
解决了单机单数据源一致性的业务痛点。 一个字 猛!

相关文章