Seata TCC模式-TCC模式

x33g5p2x  于2021-10-28 转载在 其他  
字(25.5k)|赞(0)|评价(0)|浏览(737)

项目源码: https://gitee.com/benwang6/seata-samples

一.TCC 基本原理

TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:

TCC 对业务代码侵入严重
每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
TCC 效率更高
不必对数据加全局锁,允许多个事务同时操作数据。

第一阶段 Try

以账户服务为例,当下订单时要扣减用户账户金额:

假如用户购买 100 元商品,要扣减 100 元。

TCC 事务首先对这100元的扣减金额进行预留,或者说是先冻结这100元:

第二阶段 Confirm

如果第一阶段能够顺利完成,那么说明“扣减金额”业务(分支事务)最终肯定是可以成功的。当全局事务提交时, TC会控制当前分支事务进行提交,如果提交失败,TC 会反复尝试,直到提交成功为止。

当全局事务提交时,就可以使用冻结的金额来最终实现业务数据操作:

第二阶段 Cancel

如果全局事务回滚,就把冻结的金额进行解冻,恢复到以前的状态,TC 会控制当前分支事务回滚,如果回滚失败,TC 会反复尝试,直到回滚完成为止。

多个事务并发的情况

多个TCC全局事务允许并发,它们执行扣减金额时,只需要冻结各自的金额即可:

Seata TCC事务模式

Seata 支持 TCC 事务模式,与 AT 模式相同的,也需要以下组件来支持全局事务的控制:

  • TC 事务协调器
  • TM 事务管理器
  • RM 资源管理器

下一节,我们还是以订单业务为例,来演示 Seata 如何实现 TCC 事务。

二.准备订单项目案例

项目源码:

1.新建工程,导入无事务版本

1. 新建empty工程 seata-tcc

工程命名为 seata-tcc,存放到 seata-samples 文件夹下,与 seata-at 工程存放在一起:

2. 导入订单项目,无事务版本

无事务版本的4个文件夹,解压缩到工程目录
1.下载项目代码
2.访问 git 仓库
访问项目标签

下载无事务版

1.解压到 seata-tcc 目录

压缩文件中的 7 个项目目录解压缩到 seata-tcc 目录:

2. 导入项目
  1. - project structure 中添加moudle
  2. - double shift搜索add maven project

然后选择 seata-tcc 工程目录下的 7 个项目的 pom.xml 导入:

拖拽pom.xml文件,右键add as maven project

3.order启动全局事务,添加“保存订单”分支事务

在订单项目中执行添加订单:

我们要添加以下 TCC 事务操作的代码:

  1. Try - 第一阶,冻结数据阶段,向订单表直接插入订单,订单状态设置为0(冻结状态)。

Confirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。

Cancel - 第二阶段,回滚事务,删除订单。

1.order-parent 添加 seata 依赖,

打开 order-parent 中注释掉的 seata 依赖:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.3.2.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>cn.tedu</groupId>
  12. <artifactId>order-parent</artifactId>
  13. <version>1.0-SNAPSHOT</version>
  14. <packaging>pom</packaging>
  15. <name>order-parent</name>
  16. <modules>
  17. <module>account</module>
  18. <module>storage</module>
  19. <module>order</module>
  20. </modules>
  21. <properties>
  22. <mybatis-plus.version>3.3.2</mybatis-plus.version>
  23. <druid-spring-boot-starter.version>1.1.23</druid-spring-boot-starter.version>
  24. <seata.version>1.3.0</seata.version>
  25. <spring-cloud-alibaba-seata.version>2.0.0.RELEASE</spring-cloud-alibaba-seata.version>
  26. <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
  27. </properties>
  28. <dependencies>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-starter-web</artifactId>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.springframework.cloud</groupId>
  39. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.springframework.cloud</groupId>
  43. <artifactId>spring-cloud-starter-openfeign</artifactId>
  44. </dependency>
  45. <dependency>
  46. <groupId>com.baomidou</groupId>
  47. <artifactId>mybatis-plus-boot-starter</artifactId>
  48. <version>${mybatis-plus.version}</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>mysql</groupId>
  52. <artifactId>mysql-connector-java</artifactId>
  53. </dependency>
  54. <dependency>
  55. <groupId>com.alibaba</groupId>
  56. <artifactId>druid-spring-boot-starter</artifactId>
  57. <version>${druid-spring-boot-starter.version}</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>com.alibaba.cloud</groupId>
  61. <artifactId>spring-cloud-alibaba-seata</artifactId>
  62. <version>${spring-cloud-alibaba-seata.version}</version>
  63. <exclusions>
  64. <exclusion>
  65. <artifactId>seata-all</artifactId>
  66. <groupId>io.seata</groupId>
  67. </exclusion>
  68. </exclusions>
  69. </dependency>
  70. <dependency>
  71. <groupId>io.seata</groupId>
  72. <artifactId>seata-all</artifactId>
  73. <version>${seata.version}</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.projectlombok</groupId>
  77. <artifactId>lombok</artifactId>
  78. </dependency>
  79. <dependency>
  80. <groupId>org.springframework.boot</groupId>
  81. <artifactId>spring-boot-starter-test</artifactId>
  82. <scope>test</scope>
  83. <exclusions>
  84. <exclusion>
  85. <groupId>org.junit.vintage</groupId>
  86. <artifactId>junit-vintage-engine</artifactId>
  87. </exclusion>
  88. </exclusions>
  89. </dependency>
  90. </dependencies>
  91. <dependencyManagement>
  92. <dependencies>
  93. <dependency>
  94. <groupId>org.springframework.cloud</groupId>
  95. <artifactId>spring-cloud-dependencies</artifactId>
  96. <version>${spring-cloud.version}</version>
  97. <type>pom</type>
  98. <scope>import</scope>
  99. </dependency>
  100. </dependencies>
  101. </dependencyManagement>
  102. <build>
  103. <plugins>
  104. <plugin>
  105. <groupId>org.springframework.boot</groupId>
  106. <artifactId>spring-boot-maven-plugin</artifactId>
  107. </plugin>
  108. </plugins>
  109. </build>
  110. </project>
2. 三个配置文件
1.application.yml --事务组
  1. spring:
  2. application:
  3. name: order
  4. datasource:
  5. driver-class-name: com.mysql.cj.jdbc.Driver
  6. url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
  7. username: root
  8. password: root
  9. cloud:
  10. alibaba:
  11. seata:
  12. tx-service-group: order_tx_group
  13. server:
  14. port: 8083
  15. eureka:
  16. client:
  17. service-url:
  18. defaultZone: http://localhost:8761/eureka
  19. instance:
  20. prefer-ip-address: true
  21. instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
  22. mybatis-plus:
  23. type-aliases-package: cn.tedu.order.entity
  24. mapper-locations:
  25. - classpath:/mapper/*Mapper.xml configuration: map-underscore-to-camel-case: true logging: level: cn.tedu.order.mapper: DEBUG ribbon: MaxAutoRetriesNextServer: 0 #默认1
2. registry.conf – 注册中心的地址
  1. registry {
  2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  3. type = "eureka"
  4. nacos {
  5. serverAddr = "localhost"
  6. namespace = ""
  7. cluster = "default"
  8. }
  9. eureka {
  10. # 连接eureka,要从注册表发现 seata-server
  11. serviceUrl = "http://localhost:8761/eureka"
  12. # application = "default"
  13. # weight = "1"
  14. }
  15. redis {
  16. serverAddr = "localhost:6379"
  17. db = "0"
  18. password = ""
  19. cluster = "default"
  20. timeout = "0"
  21. }
  22. zk {
  23. cluster = "default"
  24. serverAddr = "127.0.0.1:2181"
  25. session.timeout = 6000
  26. connect.timeout = 2000
  27. username = ""
  28. password = ""
  29. }
  30. consul {
  31. cluster = "default"
  32. serverAddr = "127.0.0.1:8500"
  33. }
  34. etcd3 {
  35. cluster = "default"
  36. serverAddr = "http://localhost:2379"
  37. }
  38. sofa {
  39. serverAddr = "127.0.0.1:9603"
  40. application = "default"
  41. region = "DEFAULT_ZONE"
  42. datacenter = "DefaultDataCenter"
  43. cluster = "default"
  44. group = "SEATA_GROUP"
  45. addressWaitTime = "3000"
  46. }
  47. file {
  48. name = "file.conf"
  49. }
  50. }
  51. config {
  52. # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  53. type = "file"
  54. nacos {
  55. serverAddr = "localhost"
  56. namespace = ""
  57. group = "SEATA_GROUP"
  58. }
  59. consul {
  60. serverAddr = "127.0.0.1:8500"
  61. }
  62. apollo {
  63. app.id = "seata-server"
  64. apollo.meta = "http://192.168.1.204:8801"
  65. namespace = "application"
  66. }
  67. zk {
  68. serverAddr = "127.0.0.1:2181"
  69. session.timeout = 6000
  70. connect.timeout = 2000
  71. username = ""
  72. password = ""
  73. }
  74. etcd3 {
  75. serverAddr = "http://localhost:2379"
  76. }
  77. file {
  78. name = "file.conf"
  79. }
  80. }
3.file.conf –
  1. transport {
  2. # tcp udt unix-domain-socket
  3. type = "TCP"
  4. #NIO NATIVE
  5. server = "NIO"
  6. #enable heartbeat
  7. heartbeat = true
  8. # the client batch send request enable
  9. enableClientBatchSendRequest = true
  10. #thread factory for netty
  11. threadFactory {
  12. bossThreadPrefix = "NettyBoss"
  13. workerThreadPrefix = "NettyServerNIOWorker"
  14. serverExecutorThread-prefix = "NettyServerBizHandler"
  15. shareBossWorker = false
  16. clientSelectorThreadPrefix = "NettyClientSelector"
  17. clientSelectorThreadSize = 1
  18. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  19. # netty boss thread size,will not be used for UDT
  20. bossThreadSize = 1
  21. #auto default pin or 8
  22. workerThreadSize = "default"
  23. }
  24. shutdown {
  25. # when destroy server, wait seconds
  26. wait = 3
  27. }
  28. serialization = "seata"
  29. compressor = "none"
  30. }
  31. service {
  32. #transaction service group mapping
  33. # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  34. # “seata-server” 与 TC 服务器的注册名一致
  35. # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  36. # order_tx_group 事务组,对应使用哪个协调器
  37. # seata-server 是注册表中的服务id
  38. vgroupMapping.order_tx_group = "seata-server"
  39. #only support when registry.type=file, please don't set multiple addresses
  40. order_tx_group.grouplist = "127.0.0.1:8091"
  41. #degrade, current not support
  42. enableDegrade = false
  43. #disable seata
  44. disableGlobalTransaction = false
  45. }
  46. client {
  47. rm {
  48. asyncCommitBufferLimit = 10000
  49. lock {
  50. retryInterval = 10
  51. retryTimes = 30
  52. retryPolicyBranchRollbackOnConflict = true
  53. }
  54. reportRetryCount = 5
  55. tableMetaCheckEnable = false
  56. reportSuccessEnable = false
  57. }
  58. tm {
  59. commitRetryCount = 5
  60. rollbackRetryCount = 5
  61. }
  62. undo {
  63. dataValidation = true
  64. logSerialization = "jackson"
  65. logTable = "undo_log"
  66. }
  67. log {
  68. exceptionRate = 100
  69. }
  70. }
3. OrderMapper 添加更新订单状态、删除订单

根据前面的分析,订单数据操作有以下三项:

  • 插入订单
  • 修改订单状态
  • 删除订单

在 OrderMapper 中已经有插入订单的方法了,现在需要添加修改订单和删除订单的方法(删除方法从BaseMapper继承):

  1. package cn.tedu.order.mapper;
  2. import cn.tedu.order.entity.Order;
  3. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4. public interface OrderMapper extends BaseMapper<Order> {
  5. //创建正常订单
  6. void create(Order order);
  7. //创建冻结订单
  8. void createFrozen(Order order);
  9. //修订订单状态
  10. void updateStatus(Long orderId,Integer status);
  11. //取消回滚 删除订单,使用继承的方法deleteById()
  12. }

那么对应的 OrderMapper.xml 中也要添加 sql:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="cn.tedu.order.mapper.OrderMapper" >
  4. <resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" >
  5. <id column="id" property="id" jdbcType="BIGINT" />
  6. <result column="user_id" property="userId" jdbcType="BIGINT" />
  7. <result column="product_id" property="productId" jdbcType="BIGINT" />
  8. <result column="count" property="count" jdbcType="INTEGER" />
  9. <result column="money" property="money" jdbcType="DECIMAL" />
  10. <result column="status" property="status" jdbcType="INTEGER" />
  11. </resultMap>
  12. <!--关键字必须加反引号-->
  13. <insert id="create">
  14. INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
  15. VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money},1);
  16. </insert>
  17. <!--创建冻结订单方法-->
  18. <insert id="createFrozen">
  19. INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
  20. VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money},0);
  21. </insert>
  22. <!--修改订单状态-->
  23. <update id="updateStatus">
  24. update `order` set status = #{status}
  25. where id = #{orderId};
  26. </update>
  27. <!--删除订单-->
  28. <delete id="deleteById">
  29. delete from `order` where id=#{orderId}
  30. </delete>
  31. </mapper>
4. 按照seata tcc 的实现规则,定义TccAction接口和实现
1.添加三个方法,实现TCC三个操作

第一阶段
Try - prepareCreateOrder()
第二阶段
Confirm - commit()
Cancel - rollback()

  1. package cn.tedu.order.tcc;
  2. import io.seata.rm.tcc.api.BusinessActionContext;
  3. import io.seata.rm.tcc.api.BusinessActionContextParameter;
  4. import io.seata.rm.tcc.api.LocalTCC;
  5. import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
  6. import java.math.BigDecimal;
  7. @LocalTCC
  8. public interface OrderTccAction {
  9. /* 为了避开seata的一个bug,不使用封装对象 而是一个个的单独传递参数 */
  10. @TwoPhaseBusinessAction(name = "OrderTccAction")
  11. boolean prepare(BusinessActionContext ctx,
  12. @BusinessActionContextParameter(paramName = "orderId") Long id,
  13. Long userId,
  14. Long productId,
  15. Integer count,
  16. BigDecimal money);
  17. boolean commit(BusinessActionContext ctx);
  18. boolean rollback(BusinessActionContext ctx);
  19. }

实现类:

  1. package cn.tedu.order.tcc;
  2. import cn.tedu.order.entity.Order;
  3. import cn.tedu.order.mapper.OrderMapper;
  4. import io.seata.rm.tcc.api.BusinessActionContext;
  5. import org.checkerframework.checker.units.qual.A;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.transaction.annotation.Transactional;
  9. import java.math.BigDecimal;
  10. @Component
  11. public class OrderTccActionImpl implements OrderTccAction{
  12. @Autowired
  13. private OrderMapper orderMapper;
  14. @Transactional
  15. @Override
  16. public boolean prepare(BusinessActionContext ctx, Long id, Long userId, Long productId,Integer count, BigDecimal money) {
  17. orderMapper.createFrozen(new Order(id,userId,productId,count,money,0));
  18. //true表示成功
  19. //false表示失败
  20. return true;
  21. }
  22. @Transactional
  23. @Override
  24. public boolean commit(BusinessActionContext ctx) {
  25. //
  26. Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
  27. orderMapper.updateStatus(orderId, 1);
  28. return true;
  29. }
  30. @Transactional
  31. @Override
  32. public boolean rollback(BusinessActionContext ctx) {
  33. Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
  34. orderMapper.deleteById(orderId);
  35. return true;
  36. }
  37. }
2. 三个方法都添加@Transactional注解,控制本地事务
  1. package cn.tedu.order.tcc;
  2. import cn.tedu.order.entity.Order;
  3. import cn.tedu.order.mapper.OrderMapper;
  4. import io.seata.rm.tcc.api.BusinessActionContext;
  5. import org.checkerframework.checker.units.qual.A;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.transaction.annotation.Transactional;
  9. import java.math.BigDecimal;
  10. @Component
  11. public class OrderTccActionImpl implements OrderTccAction{
  12. @Autowired
  13. private OrderMapper orderMapper;
  14. @Transactional
  15. @Override
  16. public boolean prepare(BusinessActionContext ctx, Long id, Long userId, Long productId,Integer count, BigDecimal money) {
  17. orderMapper.createFrozen(new Order(id,userId,productId,count,money,0));
  18. //true表示成功
  19. //false表示失败
  20. return true;
  21. }
  22. @Transactional
  23. @Override
  24. public boolean commit(BusinessActionContext ctx) {
  25. //
  26. Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
  27. orderMapper.updateStatus(orderId, 1);
  28. return true;
  29. }
  30. @Transactional
  31. @Override
  32. public boolean rollback(BusinessActionContext ctx) {
  33. Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
  34. orderMapper.deleteById(orderId);
  35. return true;
  36. }
  37. }
5.在业务代码中调用 Try 阶段方法

业务代码中不再直接保存订单数据,而是调用 TCC 第一阶段方法prepareCreateOrder(),并添加全局事务注解 @GlobalTransactional:

  1. package cn.tedu.order.service;
  2. import cn.tedu.order.entity.Order;
  3. import cn.tedu.order.fegin.AccountClient;
  4. import cn.tedu.order.fegin.EasyIdClient;
  5. import cn.tedu.order.fegin.StorageClient;
  6. import cn.tedu.order.mapper.OrderMapper;
  7. import cn.tedu.order.tcc.OrderTccActionImpl;
  8. import io.seata.spring.annotation.GlobalTransactional;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Service;
  11. import java.util.Random;
  12. @Service
  13. public class OrderServiceImpl implements OrderService{
  14. @Autowired
  15. private OrderMapper orderMapper;
  16. @Autowired
  17. private EasyIdClient easyIdClient;
  18. @Autowired
  19. private AccountClient accountClient;
  20. @Autowired
  21. private StorageClient storageClient;
  22. @Autowired
  23. private OrderTccActionImpl tcc;
  24. @GlobalTransactional //启动全局事务
  25. @Override
  26. public void create(Order order) {
  27. //远程调用发号器,生成订单id
  28. String s = easyIdClient.nextId("order_business");
  29. Long id = Long.valueOf(s);
  30. order.setId(id);
  31. /*//先临时随机产生id,加了发号器后,这行代码删除 Long id = Math.abs(new Random().nextLong());*/
  32. /* tcc是一个动态代理对象,不是原始的对象 用AOP切入了代码,会拦截调用,新建上下文对象并传入目标方法 */
  33. //冻结订单
  34. tcc.prepare(
  35. null,
  36. order.getId(),
  37. order.getUserId(),
  38. order.getProductId(),
  39. order.getCount(),
  40. order.getMoney()
  41. );
  42. //orderMapper.create(order);//创建正常订单
  43. // TODO:远程调用库存,减少库存
  44. storageClient.decrease(order.getProductId(),order.getCount());
  45. // TODO:远程调用账号,扣减账户
  46. //accountClient.decrease(order.getUserId(),order.getMoney());
  47. }
  48. }
6.启动 order 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Order

调用保存订单,地址:

观察控制台日志:

查看数据库表中的订单数据:

4.storage添加“减少库存”分支事务

在库存项目中执行减少库存:

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶段,冻结数据阶段,将要减少的库存量先冻结:

Confirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:

Cancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:

1.配置

有三个文件需要配置:

1.application.yml
  1. spring:
  2. application:
  3. name: storage
  4. datasource:
  5. driver-class-name: com.mysql.cj.jdbc.Driver
  6. url: jdbc:mysql://localhost/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
  7. username: root
  8. password: root
  9. cloud:
  10. alibaba:
  11. seata:
  12. tx-service-group: order_tx_group
  13. server:
  14. port: 8082
  15. eureka:
  16. client:
  17. service-url:
  18. defaultZone: http://localhost:8761/eureka
  19. instance:
  20. prefer-ip-address: true
  21. instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
  22. mybatis-plus:
  23. type-aliases-package: cn.tedu.storage.entity
  24. mapper-locations:
  25. - classpath:/mapper/*Mapper.xml
  26. configuration:
  27. map-underscore-to-camel-case: true
  28. logging:
  29. level:
  30. cn.tedu.storage.mapper: DEBUG
2.registry.conf
  1. registry {
  2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  3. type = "eureka"
  4. nacos {
  5. serverAddr = "localhost"
  6. namespace = ""
  7. cluster = "default"
  8. }
  9. eureka {
  10. # 连接eureka,要从注册表发现 seata-server
  11. serviceUrl = "http://localhost:8761/eureka"
  12. # application = "default"
  13. # weight = "1"
  14. }
  15. redis {
  16. serverAddr = "localhost:6379"
  17. db = "0"
  18. password = ""
  19. cluster = "default"
  20. timeout = "0"
  21. }
  22. zk {
  23. cluster = "default"
  24. serverAddr = "127.0.0.1:2181"
  25. session.timeout = 6000
  26. connect.timeout = 2000
  27. username = ""
  28. password = ""
  29. }
  30. consul {
  31. cluster = "default"
  32. serverAddr = "127.0.0.1:8500"
  33. }
  34. etcd3 {
  35. cluster = "default"
  36. serverAddr = "http://localhost:2379"
  37. }
  38. sofa {
  39. serverAddr = "127.0.0.1:9603"
  40. application = "default"
  41. region = "DEFAULT_ZONE"
  42. datacenter = "DefaultDataCenter"
  43. cluster = "default"
  44. group = "SEATA_GROUP"
  45. addressWaitTime = "3000"
  46. }
  47. file {
  48. name = "file.conf"
  49. }
  50. }
  51. config {
  52. # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  53. type = "file"
  54. nacos {
  55. serverAddr = "localhost"
  56. namespace = ""
  57. group = "SEATA_GROUP"
  58. }
  59. consul {
  60. serverAddr = "127.0.0.1:8500"
  61. }
  62. apollo {
  63. app.id = "seata-server"
  64. apollo.meta = "http://192.168.1.204:8801"
  65. namespace = "application"
  66. }
  67. zk {
  68. serverAddr = "127.0.0.1:2181"
  69. session.timeout = 6000
  70. connect.timeout = 2000
  71. username = ""
  72. password = ""
  73. }
  74. etcd3 {
  75. serverAddr = "http://localhost:2379"
  76. }
  77. file {
  78. name = "file.conf"
  79. }
  80. }
3.file.conf
  1. transport {
  2. # tcp udt unix-domain-socket
  3. type = "TCP"
  4. #NIO NATIVE
  5. server = "NIO"
  6. #enable heartbeat
  7. heartbeat = true
  8. # the client batch send request enable
  9. enableClientBatchSendRequest = true
  10. #thread factory for netty
  11. threadFactory {
  12. bossThreadPrefix = "NettyBoss"
  13. workerThreadPrefix = "NettyServerNIOWorker"
  14. serverExecutorThread-prefix = "NettyServerBizHandler"
  15. shareBossWorker = false
  16. clientSelectorThreadPrefix = "NettyClientSelector"
  17. clientSelectorThreadSize = 1
  18. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  19. # netty boss thread size,will not be used for UDT
  20. bossThreadSize = 1
  21. #auto default pin or 8
  22. workerThreadSize = "default"
  23. }
  24. shutdown {
  25. # when destroy server, wait seconds
  26. wait = 3
  27. }
  28. serialization = "seata"
  29. compressor = "none"
  30. }
  31. service {
  32. #transaction service group mapping
  33. # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  34. # “seata-server” 与 TC 服务器的注册名一致
  35. # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  36. # order_tx_group 事务组,对应使用哪个协调器
  37. # seata-server 是注册表中的服务id
  38. vgroupMapping.order_tx_group = "seata-server"
  39. #only support when registry.type=file, please don't set multiple addresses
  40. order_tx_group.grouplist = "127.0.0.1:8091"
  41. #degrade, current not support
  42. enableDegrade = false
  43. #disable seata
  44. disableGlobalTransaction = false
  45. }
  46. client {
  47. rm {
  48. asyncCommitBufferLimit = 10000
  49. lock {
  50. retryInterval = 10
  51. retryTimes = 30
  52. retryPolicyBranchRollbackOnConflict = true
  53. }
  54. reportRetryCount = 5
  55. tableMetaCheckEnable = false
  56. reportSuccessEnable = false
  57. }
  58. tm {
  59. commitRetryCount = 5
  60. rollbackRetryCount = 5
  61. }
  62. undo {
  63. dataValidation = true
  64. logSerialization = "jackson"
  65. logTable = "undo_log"
  66. }
  67. log {
  68. exceptionRate = 100
  69. }
  70. }

这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

2.StorageMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 StorageMapper 中添加三个方法:

  1. package cn.tedu.storage.mapper;
  2. import cn.tedu.storage.entity.Storage;
  3. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4. public interface StorageMapper extends BaseMapper<Storage> {
  5. //减少库存
  6. void decrease(Long productId,Integer count);
  7. //查询库存,用来判断有没有足够的库存
  8. Storage findByProductId(Long productId);
  9. //冻结库存
  10. void updateResidueToFrozen(Long productId,Integer count);
  11. //冻结 --> 已售出
  12. void updateFrozenToUsed(Long productId,Integer count);
  13. //冻结 --> 可用
  14. void updateFrozenToResidue(Long productId,Integer count);
  15. }

对应的 StorageMapper.xml 中也要添加 sql:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
  4. <resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
  5. <id column="id" property="id" jdbcType="BIGINT" />
  6. <result column="product_id" property="productId" jdbcType="BIGINT" />
  7. <result column="total" property="total" jdbcType="INTEGER" />
  8. <result column="used" property="used" jdbcType="INTEGER" />
  9. <result column="residue" property="residue" jdbcType="INTEGER" />
  10. </resultMap>
  11. <update id="decrease">
  12. UPDATE storage
  13. SET used = used + #{count},
  14. residue = residue - #{count}
  15. WHERE product_id = #{productId}
  16. </update>
  17. <!--查询库存-->
  18. <select id="findByProductId" resultMap="BaseResultMap">
  19. select * from storage where product_id = #{productId}
  20. </select>
  21. <!--冻结库存-->
  22. <update id="updateResidueToFrozen">
  23. update storage
  24. set Residue=Residue-#{count},Frozen=Frozen+#{count}
  25. where product_id = #{productId}
  26. </update>
  27. <!--冻结 &ndash;&gt; 已售出-->
  28. <update id="updateFrozenToUsed">
  29. update storage
  30. set Frozen=Frozen-#{count},Used=Used+#{count}
  31. where product_id = #{productId}
  32. </update>
  33. <!--冻结 &ndash;&gt; 可用-->
  34. <update id="updateFrozenToResidue">
  35. update storage
  36. set Frozen=Frozen-#{count},Residue=Residue-#{count}
  37. where product_id = #{productId}
  38. </update>
  39. </mapper>
3.添加 TCC 接口,在接口中添加以下方法:

Try - prepareDecreaseStorage()
Confirm - commit()
Cancel - rollback()

  1. package cn.tedu.storage.tcc;
  2. import io.seata.rm.tcc.api.BusinessActionContext;
  3. import io.seata.rm.tcc.api.BusinessActionContextParameter;
  4. import io.seata.rm.tcc.api.LocalTCC;
  5. import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
  6. /* 1.@LocalTCC 2.添加三个方法,都添加BusssinessActionContext 参数 3.T方法添加@TwoPhaseBusinessAction 4.T方法添加业务数据参数 5.需要添加到上下文对象的参数,添加@BussinessActionContextParameter */
  7. @LocalTCC
  8. public interface StorageTccAction {
  9. @TwoPhaseBusinessAction(name = "StorageTccAction")
  10. boolean prepare(BusinessActionContext ctx,
  11. @BusinessActionContextParameter(paramName = "productId") Long productId,
  12. @BusinessActionContextParameter(paramName = "count") Integer count);
  13. boolean commit(BusinessActionContext ctx);
  14. boolean rollback(BusinessActionContext ctx);
  15. }

实现类:

  1. package cn.tedu.storage.tcc;
  2. import cn.tedu.storage.entity.Storage;
  3. import cn.tedu.storage.mapper.StorageMapper;
  4. import io.seata.rm.tcc.api.BusinessActionContext;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.annotation.Transactional;
  8. import rx.Producer;
  9. @Component
  10. public class StorageTccActionImpl implements StorageTccAction {
  11. @Autowired
  12. private StorageMapper storageMapper;
  13. @Transactional
  14. @Override
  15. public boolean prepare(BusinessActionContext ctx, Long productId, Integer count) {
  16. Storage storage = storageMapper.findByProductId(productId);
  17. if (storage.getResidue() < count){
  18. throw new RuntimeException("库存不足");
  19. }
  20. storageMapper.updateResidueToFrozen(productId, count);
  21. return true;
  22. }
  23. @Transactional
  24. @Override
  25. public boolean commit(BusinessActionContext ctx) {
  26. Long productId = Long.valueOf(ctx.getActionContext("productId").toString());
  27. Integer count = Integer.valueOf(ctx.getActionContext("count").toString());
  28. storageMapper.updateFrozenToUsed(productId, count);
  29. if (Math.random()<0.8){
  30. return false;
  31. }
  32. return true;
  33. }
  34. @Transactional
  35. @Override
  36. public boolean rollback(BusinessActionContext ctx) {
  37. Long productId = Long.valueOf(ctx.getActionContext("productId").toString());
  38. Integer count = Integer.valueOf(ctx.getActionContext("count").toString());
  39. storageMapper.updateFrozenToResidue(productId, count);
  40. return true;
  41. }
  42. }
4.在业务代码中调用 Try 阶段方法

业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage(),并添加全局事务注解 @GlobalTransactional:

  1. package cn.tedu.storage.service;
  2. import cn.tedu.storage.mapper.StorageMapper;
  3. import cn.tedu.storage.tcc.StorageTccAction;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class StorageServiceImpl implements StorageService{
  8. @Autowired
  9. private StorageTccAction tcc;
  10. @Override
  11. public void decrease(Long productId, Integer count) {
  12. tcc.prepare(null, productId, count);
  13. }
  14. }
5.启动 storage 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Order
    调用保存订单,地址:

观察 storage 的控制台日志:

查看数据库表中的库存数据:

5.account添加“扣减金额”分支事务

扣减金额 TCC 事务分析请见Seata TCC模式-TCC模式介绍
配置
有三个文件需要配置:

application.yml
registry.conf
file.conf
这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

1.AccountMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 AccountMapper 中添加三个方法:

添加 TCC 接口,在接口中添加以下方法:

Try - prepareDecreaseAccount()
Confirm - commit()
Cancel - rollback()

实现类:

在业务代码中调用 Try 阶段方法
业务代码中调用 TCC 第一阶段方法prepareDecreaseAccount(),并添加全局事务注解 @GlobalTransactional:

启动 account 进行测试
按顺序启动服务:

Eureka
Seata Server
Easy Id Generator
Storage
Account
Order
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察 account 的控制台日志:

相关文章