Spring Boot 分布式事务 Seata 入门

x33g5p2x  于2021-12-21 转载在 其他  
字(35.7k)|赞(0)|评价(0)|浏览(543)

1. 概述

《芋道 Seata 极简入门》文章中,我们对 Seata 进行了简单的了解,并完成了 Seata 的部署。而本文,我们将 Spring Boot 应用接入 Seata 来实现分布式事务。
Seata 是阿里开源的一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

2. AT 模式 + 多数据源

示例代码对应仓库:lab-52-multiple-datasource 。

在 Spring Boot 单体项目中,如果使用了多个数据源,我们就需要考虑多个数据源的一致性,面临分布式事务的问题。本小节,我们将使用 Seata 的 AT 模式,解决该问题。

友情提示:对 Seata 的 AT 模式不了解的胖友,可以阅读《Seata 文档 —— AT 模式》文档。

我们以用户购买商品的业务逻辑,来作为具体示例,一共会有三个模块的 Service,分别对应不同的数据库。整体如下图所示:

下面,我们来新建 lab-52-multiple-datasource 项目,最终结构如下图:

2.1 初始化数据库

使用 data.sql 脚本,创建 seata_orderseata_storageseata_amount 三个库。脚本内容如下:

  1. # Order
  2. DROP DATABASE IF EXISTS seata_order;
  3. CREATE DATABASE seata_order;
  4. CREATE TABLE seata_order.orders
  5. (
  6. id INT(11) NOT NULL AUTO_INCREMENT,
  7. user_id INT(11) DEFAULT NULL,
  8. product_id INT(11) DEFAULT NULL,
  9. pay_amount DECIMAL(10, 0) DEFAULT NULL,
  10. add_time DATETIME DEFAULT CURRENT_TIMESTAMP,
  11. last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  12. PRIMARY KEY (id)
  13. ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
  14. CREATE TABLE seata_order.undo_log
  15. (
  16. id BIGINT(20) NOT NULL AUTO_INCREMENT,
  17. branch_id BIGINT(20) NOT NULL,
  18. xid VARCHAR(100) NOT NULL,
  19. context VARCHAR(128) NOT NULL,
  20. rollback_info LONGBLOB NOT NULL,
  21. log_status INT(11) NOT NULL,
  22. log_created DATETIME NOT NULL,
  23. log_modified DATETIME NOT NULL,
  24. PRIMARY KEY (id),
  25. UNIQUE KEY ux_undo_log (xid, branch_id)
  26. ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
  27. # Storage
  28. DROP DATABASE IF EXISTS seata_storage;
  29. CREATE DATABASE seata_storage;
  30. CREATE TABLE seata_storage.product
  31. (
  32. id INT(11) NOT NULL AUTO_INCREMENT,
  33. stock INT(11) DEFAULT NULL,
  34. last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  35. PRIMARY KEY (id)
  36. ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
  37. INSERT INTO seata_storage.product (id, stock) VALUES (1, 10); # 插入一条产品的库存
  38. CREATE TABLE seata_storage.undo_log
  39. (
  40. id BIGINT(20) NOT NULL AUTO_INCREMENT,
  41. branch_id BIGINT(20) NOT NULL,
  42. xid VARCHAR(100) NOT NULL,
  43. context VARCHAR(128) NOT NULL,
  44. rollback_info LONGBLOB NOT NULL,
  45. log_status INT(11) NOT NULL,
  46. log_created DATETIME NOT NULL,
  47. log_modified DATETIME NOT NULL,
  48. PRIMARY KEY (id),
  49. UNIQUE KEY ux_undo_log (xid, branch_id)
  50. ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
  51. # Amount
  52. DROP DATABASE IF EXISTS seata_amount;
  53. CREATE DATABASE seata_amount;
  54. CREATE TABLE seata_amount.account
  55. (
  56. id INT(11) NOT NULL AUTO_INCREMENT,
  57. balance DOUBLE DEFAULT NULL,
  58. last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  59. PRIMARY KEY (id)
  60. ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
  61. CREATE TABLE seata_amount.undo_log
  62. (
  63. id BIGINT(20) NOT NULL AUTO_INCREMENT,
  64. branch_id BIGINT(20) NOT NULL,
  65. xid VARCHAR(100) NOT NULL,
  66. context VARCHAR(128) NOT NULL,
  67. rollback_info LONGBLOB NOT NULL,
  68. log_status INT(11) NOT NULL,
  69. log_created DATETIME NOT NULL,
  70. log_modified DATETIME NOT NULL,
  71. PRIMARY KEY (id),
  72. UNIQUE KEY ux_undo_log (xid, branch_id)
  73. ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
  74. INSERT INTO seata_amount.account (id, balance) VALUES (1, 1);

其中,每个库中的 undo_log 表,是 Seata AT 模式必须创建的表,主要用于分支事务的回滚。

另外,考虑到测试方便,我们插入了一条 id = 1 的 account 记录,和一条 id = 1 的 product 记录。

2.2 引入依赖

创建 pom.xml 文件,引入相关的依赖。内容如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.2.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <modelVersion>4.0.0</modelVersion>
  12. <artifactId>lab-52-multiple-datasource</artifactId>
  13. <dependencies>
  14. <!-- 实现对 Spring MVC 的自动化配置 -->
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-starter-web</artifactId>
  18. </dependency>
  19. <!-- 实现对数据库连接池的自动化配置 -->
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-jdbc</artifactId>
  23. </dependency>
  24. <dependency> <!-- 本示例,我们使用 MySQL -->
  25. <groupId>mysql</groupId>
  26. <artifactId>mysql-connector-java</artifactId>
  27. <version>5.1.48</version>
  28. </dependency>
  29. <!-- 实现对 MyBatis 的自动化配置 -->
  30. <dependency>
  31. <groupId>org.mybatis.spring.boot</groupId>
  32. <artifactId>mybatis-spring-boot-starter</artifactId>
  33. <version>2.1.2</version>
  34. </dependency>
  35. <!-- 实现对 dynamic-datasource 的自动化配置 -->
  36. <dependency>
  37. <groupId>com.baomidou</groupId>
  38. <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  39. <version>3.0.0</version>
  40. </dependency>
  41. <!-- 实现对 Seata 的自动化配置 -->
  42. <dependency>
  43. <groupId>io.seata</groupId>
  44. <artifactId>seata-spring-boot-starter</artifactId>
  45. <version>1.1.0</version>
  46. </dependency>
  47. </dependencies>
  48. </project>

① 引入 dynamic-datasource-spring-boot-starter 依赖,实现对 dynamic-datasource 的自动配置,用于多数据源的切换功能。

友情提示:关于数据源的切换功能,胖友可以阅读《芋道 Spring Boot 多数据源(读写分离)入门》文章,贼详细。

② 引入 seata-spring-boot-starter 依赖,实现对 Seata 的自动配置。

2.3 配置文件

创建 application.yaml 配置文件,添加相关的配置项。内容如下:

  1. server:
  2. port: 8081 # 端口
  3. spring:
  4. application:
  5. name: multi-datasource-service # 应用名
  6. datasource:
  7. # dynamic-datasource-spring-boot-starter 动态数据源的配配项,对应 DynamicDataSourceProperties 类
  8. dynamic:
  9. primary: order-ds # 设置默认的数据源或者数据源组,默认值即为 master
  10. datasource:
  11. # 订单 order 数据源配置
  12. order-ds:
  13. url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8
  14. driver-class-name: com.mysql.jdbc.Driver
  15. username: root
  16. password:
  17. # 账户 pay 数据源配置
  18. amount-ds:
  19. url: jdbc:mysql://127.0.0.1:3306/seata_pay?useSSL=false&useUnicode=true&characterEncoding=UTF-8
  20. driver-class-name: com.mysql.jdbc.Driver
  21. username: root
  22. password:
  23. # 库存 storage 数据源配置
  24. storage-ds:
  25. url: jdbc:mysql://127.0.0.1:3306/seata_storage?useSSL=false&useUnicode=true&characterEncoding=UTF-8
  26. driver-class-name: com.mysql.jdbc.Driver
  27. username: root
  28. password:
  29. seata: true # 是否启动对 Seata 的集成
  30. # Seata 配置项,对应 SeataProperties 类
  31. seata:
  32. application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
  33. tx-service-group: ${spring.application.name}-group # Seata 事务组编号,用于 TC 集群名
  34. # 服务配置项,对应 ServiceProperties 类
  35. service:
  36. # 虚拟组和分组的映射
  37. vgroup-mapping:
  38. multi-datasource-service-group: default
  39. # 分组和 Seata 服务的映射
  40. grouplist:
  41. default: 127.0.0.1:8091

① spring.datasource.dynamic 配置项,设置 dynamic-datasource-spring-boot-starter 动态数据源的配置项,对应 DynamicDataSourceProperties 类。

注意,一定要设置 spring.datasource.dynamic.seata 配置项为 true,开启对 Seata 的集成!!!艿艿一开始忘记配置,导致 Seata 全局事务回滚失败。

② seata 配置项,设置 Seata 的配置项目,对应 SeataProperties 类。

  • application-id 配置项,对应 Seata 应用编号,默认为 ${spring.application.name}。实际上,可以不进行设置。
  • tx-service-group 配置项,Seata 事务组编号,用于 TC 集群名。

③ seata.service 配置项,Seata 服务配置项,对应 ServiceProperties 类。它主要用于 Seata 在事务分组的特殊设计,可见《Seata 文档 —— 事务分组专题》。如果不能理解的胖友,可以见如下图:

简单来说,就是多了一层虚拟映射。这里,我们直接设置 TC Server 的地址,为 127.0.0.1:8091

2.4 订单模块

2.4.1 OrderController

创建 OrderController 类,提供 order/create 下单 HTTP API。代码如下:

  1. @RestController
  2. @RequestMapping("/order")
  3. public class OrderController {
  4. private Logger logger = LoggerFactory.getLogger(OrderController.class);
  5. @Autowired
  6. private OrderService orderService;
  7. @PostMapping("/create")
  8. public Integer createOrder(@RequestParam("userId") Long userId,
  9. @RequestParam("productId") Long productId,
  10. @RequestParam("price") Integer price) throws Exception {
  11. logger.info("[createOrder] 收到下单请求,用户:{}, 商品:{}, 价格:{}", userId, productId, price);
  12. return orderService.createOrder(userId, productId, price);
  13. }
  14. }
  • 该 API 中,会调用 OrderService 进行下单。
    友情提示:因为这个是示例项目,所以直接传入 price 参数,作为订单的金额,实际肯定不是这样的,哈哈哈~

2.4.2 OrderService

创建 OrderService 接口,定义了创建订单的方法。代码如下:

  1. /**
  2. * 订单 Service
  3. */
  4. public interface OrderService {
  5. /**
  6. * 创建订单
  7. *
  8. * @param userId 用户编号
  9. * @param productId 产品编号
  10. * @param price 价格
  11. * @return 订单编号
  12. * @throws Exception 创建订单失败,抛出异常
  13. */
  14. Integer createOrder(Long userId, Long productId, Integer price) throws Exception;
  15. }

2.4.3 OrderServiceImpl

创建 OrderServiceImpl 类,实现创建订单的方法。代码如下:

  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3. private Logger logger = LoggerFactory.getLogger(getClass());
  4. @Autowired
  5. private OrderDao orderDao;
  6. @Autowired
  7. private AccountService accountService;
  8. @Autowired
  9. private ProductService productService;
  10. @Override
  11. @DS(value = "order-ds") // <1>
  12. @GlobalTransactional // <2>
  13. public Integer createOrder(Long userId, Long productId, Integer price) throws Exception {
  14. Integer amount = 1; // 购买数量,暂时设置为 1。
  15. logger.info("[createOrder] 当前 XID: {}", RootContext.getXID());
  16. // <3> 扣减库存
  17. productService.reduceStock(productId, amount);
  18. // <4> 扣减余额
  19. accountService.reduceBalance(userId, price);
  20. // <5> 保存订单
  21. OrderDO order = new OrderDO().setUserId(userId).setProductId(productId).setPayAmount(amount * price);
  22. orderDao.saveOrder(order);
  23. logger.info("[createOrder] 保存订单: {}", order.getId());
  24. // 返回订单编号
  25. return order.getId();
  26. }
  27. }

<1> 处,在类上,添加了 @DS 注解,设置使用 order-ds 订单数据源。

<2> 处,在类上,添加 Seata @GlobalTransactional 注解,声明全局事务

<3> 和 <4> 处,在该方法中,调用 ProductService 扣除商品的库存,调用 AccountService 扣除账户的余额。虽然说,调用是 JVM 进程内的,但是 ProductService 操作的是 product-ds 商品数据源,AccountService 操作的是 account-ds 账户数据源。

<5> 处,在全部调用成功后,调用 OrderDao 保存订单。

2.4.4 OrderDao

创建 OrderDao 接口,定义保存订单的操作。代码如下:

  1. @Mapper
  2. @Repository
  3. public interface OrderDao {
  4. /**
  5. * 插入订单记录
  6. *
  7. * @param order 订单
  8. * @return 影响记录数量
  9. */
  10. @Insert("INSERT INTO orders (user_id, product_id, pay_amount) VALUES (#{userId}, #{productId}, #{payAmount})")
  11. @Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")
  12. int saveOrder(OrderDO order);
  13. }

其中,OrderDO 实体类,对应 orders 表。代码如下: 

  1. /**
  2. * 订单实体
  3. */
  4. public class OrderDO {
  5. /** 订单编号 **/
  6. private Integer id;
  7. /** 用户编号 **/
  8. private Long userId;
  9. /** 产品编号 **/
  10. private Long productId;
  11. /** 支付金额 **/
  12. private Integer payAmount;
  13. // ... 省略 setter/getter 方法
  14. }

2.5 商品模块

2.5.1 ProductService

创建 ProductService 接口,定义了扣除库存的方法。代码如下:

  1. /**
  2. * 商品 Service
  3. */
  4. public interface ProductService {
  5. /**
  6. * 扣减库存
  7. *
  8. * @param productId 商品 ID
  9. * @param amount 扣减数量
  10. * @throws Exception 扣减失败时抛出异常
  11. */
  12. void reduceStock(Long productId, Integer amount) throws Exception;
  13. }

2.5.2 ProductServiceImpl

创建 ProductServiceImpl 类,实现扣减库存的方法。代码如下:

  1. @Service
  2. public class ProductServiceImpl implements ProductService {
  3. private Logger logger = LoggerFactory.getLogger(getClass());
  4. @Autowired
  5. private ProductDao productDao;
  6. @Override
  7. @DS(value = "product-ds") // <1>
  8. @Transactional(propagation = Propagation.REQUIRES_NEW) // <2> 开启新事物
  9. public void reduceStock(Long productId, Integer amount) throws Exception {
  10. logger.info("[reduceStock] 当前 XID: {}", RootContext.getXID());
  11. // <3> 检查库存
  12. checkStock(productId, amount);
  13. logger.info("[reduceStock] 开始扣减 {} 库存", productId);
  14. // <4> 扣减库存
  15. int updateCount = productDao.reduceStock(productId, amount);
  16. // 扣除成功
  17. if (updateCount == 0) {
  18. logger.warn("[reduceStock] 扣除 {} 库存失败", productId);
  19. throw new Exception("库存不足");
  20. }
  21. // 扣除失败
  22. logger.info("[reduceStock] 扣除 {} 库存成功", productId);
  23. }
  24. private void checkStock(Long productId, Integer requiredAmount) throws Exception {
  25. logger.info("[checkStock] 检查 {} 库存", productId);
  26. Integer stock = productDao.getStock(productId);
  27. if (stock < requiredAmount) {
  28. logger.warn("[checkStock] {} 库存不足,当前库存: {}", productId, stock);
  29. throw new Exception("库存不足");
  30. }
  31. }
  32. }

<1> 处,在类上,添加了 @DS 注解,设置使用 product-ds 商品数据源。

<2> 处,在类上,添加了 Spring @Transactional 注解,声明本地事务。也就是说,此处会开启一个 seata_product 库的数据库事务。

  •  

<3> 处,检查库存是否足够,如果不够则抛出 Exception 异常。因为我们需要通过异常,回滚全局异常。

<4> 处,进行扣除库存,如果扣除失败则抛出 Exception 异常。

2.5.3 ProductDao

创建 ProductDao 接口,定义获取和扣除库存的操作。代码如下:

  1. @Mapper
  2. @Repository
  3. public interface ProductDao {
  4. /**
  5. * 获取库存
  6. *
  7. * @param productId 商品编号
  8. * @return 库存
  9. */
  10. @Select("SELECT stock FROM product WHERE id = #{productId}")
  11. Integer getStock(@Param("productId") Long productId);
  12. /**
  13. * 扣减库存
  14. *
  15. * @param productId 商品编号
  16. * @param amount 扣减数量
  17. * @return 影响记录行数
  18. */
  19. @Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}")
  20. int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount);
  21. }

2.6 账户模块

友情提示:逻辑和「2.5 商品模块」基本一致,也是扣减逻辑。

2.6.1 AccountService

创建 AccountService 类,定义扣除余额的方法。代码如下:

  1. /**
  2. * 账户 Service
  3. */
  4. public interface AccountService {
  5. /**
  6. * 扣除余额
  7. *
  8. * @param userId 用户编号
  9. * @param price 扣减金额
  10. * @throws Exception 失败时抛出异常
  11. */
  12. void reduceBalance(Long userId, Integer price) throws Exception;
  13. }

2.6.2 AccountServiceImpl

创建 AccountServiceImpl 类,实现扣除余额的方法。代码如下:

  1. @Service
  2. public class AccountServiceImpl implements AccountService {
  3. private Logger logger = LoggerFactory.getLogger(getClass());
  4. @Autowired
  5. private AccountDao accountDao;
  6. @Override
  7. @DS(value = "account-ds") // <1>
  8. @Transactional(propagation = Propagation.REQUIRES_NEW) // <2> 开启新事物
  9. public void reduceBalance(Long userId, Integer price) throws Exception {
  10. logger.info("[reduceBalance] 当前 XID: {}", RootContext.getXID());
  11. // <3> 检查余额
  12. checkBalance(userId, price);
  13. logger.info("[reduceBalance] 开始扣减用户 {} 余额", userId);
  14. // <4> 扣除余额
  15. int updateCount = accountDao.reduceBalance(price);
  16. // 扣除成功
  17. if (updateCount == 0) {
  18. logger.warn("[reduceBalance] 扣除用户 {} 余额失败", userId);
  19. throw new Exception("余额不足");
  20. }
  21. logger.info("[reduceBalance] 扣除用户 {} 余额成功", userId);
  22. }
  23. private void checkBalance(Long userId, Integer price) throws Exception {
  24. logger.info("[checkBalance] 检查用户 {} 余额", userId);
  25. Integer balance = accountDao.getBalance(userId);
  26. if (balance < price) {
  27. logger.warn("[checkBalance] 用户 {} 余额不足,当前余额:{}", userId, balance);
  28. throw new Exception("余额不足");
  29. }
  30. }
  31. }

<1> 处,在类上,添加了 @DS 注解,设置使用 account-ds 账户数据源。

<2> 处,在类上,添加了 Spring @Transactional 注解,声明本地事务。也就是说,此处会开启一个 seata_account 库的数据库事务。

<3> 处,检查余额是否足够,如果不够则抛出 Exception 异常。因为我们需要通过异常,回滚全局异常。

<4> 处,进行扣除余额,如果扣除失败则抛出 Exception 异常。

2.6.3 AccountDao

创建 AccountDao 接口,定义查询和扣除余额的操作。代码如下:

  1. @Mapper
  2. @Repository
  3. public interface AccountDao {
  4. /**
  5. * 获取账户余额
  6. *
  7. * @param userId 用户 ID
  8. * @return 账户余额
  9. */
  10. @Select("SELECT balance FROM account WHERE id = #{userId}")
  11. Integer getBalance(@Param("userId") Long userId);
  12. /**
  13. * 扣减余额
  14. *
  15. * @param price 需要扣减的数目
  16. * @return 影响记录行数
  17. */
  18. @Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}")
  19. int reduceBalance(@Param("price") Integer price);
  20. }

2.7 MultipleDatasourceApplication

创建 MultipleDatasourceApplication 类,用于启动项目。代码如下:

  1. @SpringBootApplication
  2. public class MultipleDatasourceApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(MultipleDatasourceApplication.class, args);
  5. }
  6. }

2.8 简单测试

下面,我们将测试两种情况:

  1. 分布式事务正常提交
  2. 分布式事务异常回滚

Debug 执行 MultipleDatasourceApplication 启动 Spring Boot 应用。此时,我们可以看到 Seata 相关日志如下:
友情提示:日志的顺序,艿艿做了简单的整理,为了更容易阅读。

  1. # ... 上面还有 Seata 相关 Bean 初始化的日志,忘记加进来了,嘿嘿~
  2. # `dynamic-datasource` 初始化动态数据源
  3. 2020-04-03 21:01:28.305 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource detect ALIBABA SEATA and enabled it
  4. 2020-04-03 21:01:28.415 INFO 24912 --- [ main] com.alibaba.druid.pool.DruidDataSource : {dataSource-1,order-ds} inited
  5. 2020-04-03 21:01:28.416 INFO 24912 --- [ main] com.alibaba.druid.pool.DruidDataSource : {dataSource-2,account-ds} inited
  6. 2020-04-03 21:01:28.417 INFO 24912 --- [ main] com.alibaba.druid.pool.DruidDataSource : {dataSource-3,product-ds} inited
  7. # 给数据源增加 Seata 的数据源代理
  8. 2020-04-03 21:01:28.933 INFO 24912 --- [ main] s.s.a.d.SeataDataSourceBeanPostProcessor : Auto proxy of [dataSource]
  9. # 加载 Druid 提供的 SQL 解析器
  10. 2020-04-03 21:01:28.742 INFO 24912 --- [ main] i.s.common.loader.EnhancedServiceLoader : load DbTypeParser[druid] extension by class[io.seata.sqlparser.druid.DruidDelegatingDbTypeParser]
  11. # 连接到 Seata TC Server 服务器
  12. 2020-04-03 21:01:28.750 INFO 24912 --- [ main] i.s.c.r.netty.NettyClientChannelManager : will connect to 127.0.0.1:8091
  13. 2020-04-03 21:01:28.752 INFO 24912 --- [ main] i.s.core.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:RMROLE,address:127.0.0.1:8091,msg:< RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata_product', applicationId='multi-datasource-service', transactionServiceGroup='multi-datasource-service-group'} >
  14. # 加载 Seata 序列化器
  15. 2020-04-03 21:01:28.883 INFO 24912 --- [lector_RMROLE_1] i.s.common.loader.EnhancedServiceLoader : load Serializer[SEATA] extension by class[io.seata.serializer.seata.SeataSerializer]
  16. # 注册 Seata Resource Manager 到 Seata TC Server 成功
  17. 2020-04-03 21:01:28.751 INFO 24912 --- [ main] io.seata.core.rpc.netty.RmRpcClient : RM will register :jdbc:mysql://127.0.0.1:3306/seata_product
  18. 2020-04-03 21:01:28.902 INFO 24912 --- [ main] io.seata.core.rpc.netty.RmRpcClient : register RM success. server version:1.1.0,channel:[id: 0x0ec2ca91, L:/127.0.0.1:56463 - R:/127.0.0.1:8091]
  19. 2020-04-03 21:01:28.911 INFO 24912 --- [ main] i.s.core.rpc.netty.NettyPoolableFactory : register success, cost 52 ms, version:1.1.0,role:RMROLE,channel:[id: 0x0ec2ca91, L:/127.0.0.1:56463 - R:/127.0.0.1:8091]
  20. 2020-04-03 21:01:28.916 INFO 24912 --- [ main] io.seata.core.rpc.netty.RmRpcClient : will register resourceId:jdbc:mysql://127.0.0.1:3306/seata_account
  21. 2020-04-03 21:01:28.920 INFO 24912 --- [ main] io.seata.core.rpc.netty.RmRpcClient : will register resourceId:jdbc:mysql://127.0.0.1:3306/seata_order
  22. 2020-04-03 21:01:28.937 INFO 24912 --- [ main] io.seata.core.rpc.netty.RmRpcClient : will register resourceId:jdbc:mysql://127.0.0.1:3306/seata_order
  23. # `dynamic-datasource` 针对 Seata 的集成
  24. 2020-04-03 21:01:28.921 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource - load a datasource named [order-ds] success
  25. 2020-04-03 21:01:28.912 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource [product-ds] wrap seata plugin
  26. 2020-04-03 21:01:28.912 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource - load a datasource named [product-ds] success
  27. 2020-04-03 21:01:28.916 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource [account-ds] wrap seata plugin
  28. 2020-04-03 21:01:28.916 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource - load a datasource named [account-ds] success
  29. 2020-04-03 21:01:28.921 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource [order-ds] wrap seata plugin
  30. 2020-04-03 21:01:28.921 INFO 24912 --- [ main] c.b.d.d.DynamicRoutingDataSource : dynamic-datasource initial loaded [3] datasource,primary datasource named [order-ds]
  31. # 因为 OrderServiceImpl 添加了 `@GlobalTransactional` 注解,所以创建其代理,用于全局事务。
  32. 2020-04-03 21:01:29.115 INFO 24912 --- [ main] i.s.s.a.GlobalTransactionScanner : Bean[cn.iocoder.springboot.lab52.seatademo.service.impl.OrderServiceImpl$$EnhancerBySpringCGLIB$$f38d0660] with name [orderServiceImpl] would use interceptor [io.seata.spring.annotation.GlobalTransactionalInterceptor]

2.8.1 正常流程

① 先查询下目前数据库的数据情况。如下图所示:

② 使用 Postman 模拟调用 http://127.0.0.1:8081/order/create 创建订单的接口,如下图所示:

此时,在控制台打印日志如下图所示:

再查询下目前数据库的数据情况。如下图所示:

2.8.2 异常流程

① 先查询下目前数据库的数据情况。如下图所示:

② 在 OrderServiceImpl 的 #createOrder(...) 方法上,打上断点如下图,方便我们看到 product 表的 balance 被减少:

友情提示:这里忘记截图了,稍后 IDEA 停留在该断点时,胖友可以去查询 product 表,会发现 balance 已经减少。

③ 使用 Postman 模拟调用 http://127.0.0.1:8081/order/create 创建订单的接口,如下图所示:

此时,在控制台打印日志如下图所示:

再查询下目前数据库的数据情况。如下图所示:

3. AT 模式 + HttpClient 远程调用

示例代码对应仓库:

本小节,我们会将「2. AT 模式 + 多数据源」小节的用户购买商品的 Spring Boot 单体应用,拆成分多个 Spring Boot 应用,通过 Apache HttpClient 来实现 HTTP 远程调用每个 Spring Boot 应用提供的 Restful API 接口。整体如下图所示:

友情提示:早期的微服务架构,会采用 Nginx 对后端的服务进行负载均衡,而服务提供者使用 HttpClient 进行远程 HTTP 调用。例如说:

Seata 提供了 seata-http 项目,对 Apache HttpClient 进行集成。实现原理是:

  • 服务消费者,使用 Seata 封装的 AbstractHttpExecutor 执行器,在使用HttpClient 发起 HTTP 调用时,将 Seata 全局事务 XID 通过 Header 传递。
  • 服务提供者,使用 Seata 提供的 SpringMVC TransactionPropagationIntercepter 拦截器,将 Header 中的 Seata 全局事务 XID 解析出来,设置到 Seata 上下文 中。

如此,我们便实现了多个 Spring Boot 应用的 Seata 全局事务的传播

下面,我们来新建 lab-52-seata-at-httpclient-demo 模块,包含三个 Spring Boot 项目。最终结构如下图:

3.1 初始化数据库

使用 data.sql 脚本,创建 seata_orderseata_storageseata_amount 三个库
友情提示:整体内容和「2.1 初始化数据库」小节一样。

3.2 订单服务

新建 lab-52-seata-at-httpclient-demo-order-service 项目,作为订单服务。它主要提供 /order/create 接口,实现下单逻辑。
友情提示:整体内容和「2.4 订单模块」一致。

3.2.1 引入依赖

创建 pom.xml 文件,引入相关的依赖。内容如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.2.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <modelVersion>4.0.0</modelVersion>
  12. <artifactId>lab-52-seata-at-httpclient-demo-account-service</artifactId>
  13. <dependencies>
  14. <!-- 实现对 Spring MVC 的自动化配置 -->
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-starter-web</artifactId>
  18. </dependency>
  19. <!-- 实现对数据库连接池的自动化配置 -->
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-jdbc</artifactId>
  23. </dependency>
  24. <dependency> <!-- 本示例,我们使用 MySQL -->
  25. <groupId>mysql</groupId>
  26. <artifactId>mysql-connector-java</artifactId>
  27. <version>5.1.48</version>
  28. </dependency>
  29. <!-- 实现对 MyBatis 的自动化配置 -->
  30. <dependency>
  31. <groupId>org.mybatis.spring.boot</groupId>
  32. <artifactId>mybatis-spring-boot-starter</artifactId>
  33. <version>2.1.2</version>
  34. </dependency>
  35. <!-- 实现对 Seata 的自动化配置 -->
  36. <dependency>
  37. <groupId>io.seata</groupId>
  38. <artifactId>seata-spring-boot-starter</artifactId>
  39. <version>1.1.0</version>
  40. </dependency>
  41. <!-- 实现 Seata 对 HttpClient 的集成支持 -->
  42. <dependency>
  43. <groupId>io.seata</groupId>
  44. <artifactId>seata-http</artifactId>
  45. <version>1.1.0</version>
  46. </dependency>
  47. <!-- Apache HttpClient 依赖 -->
  48. <dependency>
  49. <groupId>org.apache.httpcomponents</groupId>
  50. <artifactId>httpclient</artifactId>
  51. <version>4.5.8</version>
  52. </dependency>
  53. </dependencies>
  54. </project>

① 引入 seata-spring-boot-starter 依赖,实现对 Seata 的自动配置。

② 引入 seata-http 依赖,实现 Seata 对 HttpClient 的集成支持。

3.2.2 配置文件

创建 application.yaml 配置文件,添加相关的配置项。内容如下:

  1. server:
  2. port: 8081 # 端口
  3. spring:
  4. application:
  5. name: order-service
  6. datasource:
  7. url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8
  8. driver-class-name: com.mysql.jdbc.Driver
  9. username: root
  10. password:
  11. # Seata 配置项,对应 SeataProperties 类
  12. seata:
  13. application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
  14. tx-service-group: ${spring.application.name}-group # Seata 事务组编号,用于 TC 集群名
  15. # 服务配置项,对应 ServiceProperties 类
  16. service:
  17. # 虚拟组和分组的映射
  18. vgroup-mapping:
  19. order-service-group: default
  20. # 分组和 Seata 服务的映射
  21. grouplist:
  22. default: 127.0.0.1:8091

① spring.datasource 配置项,设置连接 seata_order 库。

② seata 配置项,设置 Seata 的配置项目,对应 SeataProperties 类。

  • application-id 配置项,对应 Seata 应用编号,默认为 ${spring.application.name}。实际上,可以不进行设置。
  • tx-service-group 配置项,Seata 事务组编号,用于 TC 集群名。

③ seata.service 配置项,Seata 服务配置项,对应 ServiceProperties 类。它主要用于 Seata 在事务分组的特殊设计,可见《Seata 文档 —— 事务分组专题》。如果不能理解的胖友,可以见如下图:

简单来说,就是多了一层虚拟映射。这里,我们直接设置 TC Server 的地址,为 127.0.0.1:8091

3.2.3 OrderController

创建 OrderController 类,提供 order/create 下单 HTTP API。代码如下:

  1. @RestController
  2. @RequestMapping("/order")
  3. public class OrderController {
  4. private Logger logger = LoggerFactory.getLogger(OrderController.class);
  5. @Autowired
  6. private OrderService orderService;
  7. @PostMapping("/create")
  8. public Integer createOrder(@RequestParam("userId") Long userId,
  9. @RequestParam("productId") Long productId,
  10. @RequestParam("price") Integer price) throws Exception {
  11. logger.info("[createOrder] 收到下单请求,用户:{}, 商品:{}, 价格:{}", userId, productId, price);
  12. return orderService.createOrder(userId, productId, price);
  13. }
  14. }
  • 该 API 中,会调用 OrderService 进行下单。
    友情提示:因为这个是示例项目,所以直接传入 price 参数,作为订单的金额,实际肯定不是这样的,哈哈哈~

3.2.4 OrderService

创建 OrderService 接口,定义了创建订单的方法。代码如下:

  1. /**
  2. * 订单 Service
  3. */
  4. public interface OrderService {
  5. /**
  6. * 创建订单
  7. *
  8. * @param userId 用户编号
  9. * @param productId 产品编号
  10. * @param price 价格
  11. * @return 订单编号
  12. * @throws Exception 创建订单失败,抛出异常
  13. */
  14. Integer createOrder(Long userId, Long productId, Integer price) throws Exception;
  15. }

3.2.5 OrderServiceImpl

创建 OrderServiceImpl 类,实现创建订单的方法。代码如下:

  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3. private Logger logger = LoggerFactory.getLogger(getClass());
  4. @Autowired
  5. private OrderDao orderDao;
  6. @Override
  7. @GlobalTransactional // <1>
  8. public Integer createOrder(Long userId, Long productId, Integer price) throws Exception {
  9. Integer amount = 1; // 购买数量,暂时设置为 1。
  10. logger.info("[createOrder] 当前 XID: {}", RootContext.getXID());
  11. // <2> 扣减库存
  12. this.reduceStock(productId, amount);
  13. // <3> 扣减余额
  14. this.reduceBalance(userId, price);
  15. // <4> 保存订单
  16. OrderDO order = new OrderDO().setUserId(userId).setProductId(productId).setPayAmount(amount * price);
  17. orderDao.saveOrder(order);
  18. logger.info("[createOrder] 保存订单: {}", order.getId());
  19. // 返回订单编号
  20. return order.getId();
  21. }
  22. private void reduceStock(Long productId, Integer amount) throws IOException {
  23. // 参数拼接
  24. JSONObject params = new JSONObject().fluentPut("productId", String.valueOf(productId))
  25. .fluentPut("amount", String.valueOf(amount));
  26. // 执行调用
  27. HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/product/reduce-stock",
  28. params, HttpResponse.class);
  29. // 解析结果
  30. Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
  31. if (!success) {
  32. throw new RuntimeException("扣除库存失败");
  33. }
  34. }
  35. private void reduceBalance(Long userId, Integer price) throws IOException {
  36. // 参数拼接
  37. JSONObject params = new JSONObject().fluentPut("userId", String.valueOf(userId))
  38. .fluentPut("price", String.valueOf(price));
  39. // 执行调用
  40. HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8083", "/account/reduce-balance",
  41. params, HttpResponse.class);
  42. // 解析结果
  43. Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
  44. if (!success) {
  45. throw new RuntimeException("扣除余额失败");
  46. }
  47. }
  48. }

<1> 处,在类上,添加 Seata @GlobalTransactional 注解,声明全局事务

<2> 处,调用 #reduceStock(productId, amount) 方法,通过 Apache HttpClient 远程 HTTP 调用商品服务,进行扣除库存。

其中,DefaultHttpExecutor 是 Seata 封装,在使用个 HttpClient 发起 HTTP 调用时,将 Seata 全局事务 XID 通过 Header 传递。不过有两点要注意:

  • 在使用 POST 请求时,DefaultHttpExecutor 暂时只支持 application/json 请求参数格式。所以,如果胖友想要 application/x-www-form-urlencoded 等格式,需要自己重新封装~
  • 针对返回结果的转换,DefaultHttpExecutor 暂时没有实现完成,代码如下图所示:

艿艿使用的是 Seata 1.1.0 版本,未来这块应该会实现。

另外,商品服务提供的 /product/reduce-stock 接口,通过返回 true 或 false 来表示扣除库存是否成功。因此,我们在 false 扣除失败时,抛出 RuntimeException 异常,从而实现全局事务的回滚。

<3> 处,调用 #reduceBalance(userId, price) 方法,通过 Apache HttpClient 远程 HTTP 调用账户服务,进行扣除余额。整体逻辑和 <2> 一致,就不重复哔哔。

<4> 处,在全部调用成功后,调用 OrderDao 保存订单。

3.2.6 OrderDao

创建 OrderDao 接口,定义保存订单的操作。代码如下:

  1. @Mapper
  2. @Repository
  3. public interface OrderDao {
  4. /**
  5. * 插入订单记录
  6. *
  7. * @param order 订单
  8. * @return 影响记录数量
  9. */
  10. @Insert("INSERT INTO orders (user_id, product_id, pay_amount) VALUES (#{userId}, #{productId}, #{payAmount})")
  11. @Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")
  12. int saveOrder(OrderDO order);
  13. }

其中,OrderDO 实体类,对应 orders 表。代码如下: 

  1. /**
  2. * 订单实体
  3. */
  4. public class OrderDO {
  5. /** 订单编号 **/
  6. private Integer id;
  7. /** 用户编号 **/
  8. private Long userId;
  9. /** 产品编号 **/
  10. private Long productId;
  11. /** 支付金额 **/
  12. private Integer payAmount;
  13. // ... 省略 setter/getter 方法
  14. }

3.2.7 OrderServiceApplication

创建 OrderServiceApplication 类,用于启动订单服务。代码如下:

  1. @SpringBootApplication
  2. public class OrderServiceApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(OrderServiceApplication.class, args);
  5. }
  6. }

3.3 商品服务

新建 lab-52-seata-at-httpclient-demo-product-service 项目,作为商品服务。它主要提供 /product/reduce-stock 接口,实现扣除商品的库存逻辑。
友情提示:整体内容和「2.5 商品模块」一致。

3.3.1 引入依赖

创建 pom.xml 文件,引入相关的依赖。和「3.2.1 引入依赖」是一致的,就不重复“贴”出来了,胖友点击 pom.xml 文件查看。

3.3.2 配置文件

创建 application.yaml 配置文件,添加相关的配置项。和「3.2.2 配置文件」是一致的,就不重复“贴”出来了,胖友点击 application.yaml 文件查看。

3.3.3 ProductController

创建 ProductController 类,提供 /product/reduce-stock 扣除库存 HTTP API。代码如下:

  1. @RestController
  2. @RequestMapping("/product")
  3. public class ProductController {
  4. private Logger logger = LoggerFactory.getLogger(ProductController.class);
  5. @Autowired
  6. private ProductService productService;
  7. @PostMapping("/reduce-stock")
  8. public Boolean reduceStock(@RequestBody ProductReduceStockDTO productReduceStockDTO) {
  9. logger.info("[reduceStock] 收到减少库存请求, 商品:{}, 价格:{}", productReduceStockDTO.getProductId(),
  10. productReduceStockDTO.getAmount());
  11. try {
  12. productService.reduceStock(productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount());
  13. // 正常扣除库存,返回 true
  14. return true;
  15. } catch (Exception e) {
  16. // 失败扣除库存,返回 false
  17. return false;
  18. }
  19. }
  20. }
  • 该 API 中,会调用 ProductService 进行扣除库存,最终通过返回结果为 true 或者 false,表示扣除库存成功或是失败。

其中,ProductReduceStockDTO 为商品减少库存 DTO 类,代码如下:

  1. public class ProductReduceStockDTO {
  2. /**
  3. * 商品编号
  4. */
  5. private Long productId;
  6. /**
  7. * 数量
  8. */
  9. private Integer amount;
  10. // ... 省略 setter/getter 方法
  11. }

3.3.4 ProductService

创建 ProductService 接口,定义了扣除库存的方法。代码如下:

  1. /**
  2. * 商品 Service
  3. */
  4. public interface ProductService {
  5. /**
  6. * 扣减库存
  7. *
  8. * @param productId 商品 ID
  9. * @param amount 扣减数量
  10. * @throws Exception 扣减失败时抛出异常
  11. */
  12. void reduceStock(Long productId, Integer amount) throws Exception;
  13. }

3.3.5 ProductServiceImpl

创建 ProductServiceImpl 类,实现扣减库存的方法。代码如下:

  1. @Service
  2. public class ProductServiceImpl implements ProductService {
  3. private Logger logger = LoggerFactory.getLogger(getClass());
  4. @Autowired
  5. private ProductDao productDao;
  6. @Override
  7. @Transactional // <1> 开启新事物
  8. public void reduceStock(Long productId, Integer amount) throws Exception {
  9. logger.info("[reduceStock] 当前 XID: {}", RootContext.getXID());
  10. // <2> 检查库存
  11. checkStock(productId, amount);
  12. logger.info("[reduceStock] 开始扣减 {} 库存", productId);
  13. // <3> 扣减库存
  14. int updateCount = productDao.reduceStock(productId, amount);
  15. // 扣除成功
  16. if (updateCount == 0) {
  17. logger.warn("[reduceStock] 扣除 {} 库存失败", productId);
  18. throw new Exception("库存不足");
  19. }
  20. // 扣除失败
  21. logger.info("[reduceStock] 扣除 {} 库存成功", productId);
  22. }
  23. private void checkStock(Long productId, Integer requiredAmount) throws Exception {
  24. logger.info("[checkStock] 检查 {} 库存", productId);
  25. Integer stock = productDao.getStock(productId);
  26. if (stock < requiredAmount) {
  27. logger.warn("[checkStock] {} 库存不足,当前库存: {}", productId, stock);
  28. throw new Exception("库存不足");
  29. }
  30. }
  31. }

<1> 处,在类上,添加了 Spring @Transactional 注解,声明本地事务。也就是说,此处会开启一个 seata_product 库的数据库事务。

<2> 处,检查库存是否足够,如果不够则抛出 Exception 异常。因为我们需要通过异常,回滚全局异常。

<3> 处,进行扣除库存,如果扣除失败则抛出 Exception 异常。

3.3.6 ProductDao

创建 ProductDao 接口,定义获取和扣除库存的操作。代码如下:

  1. @Mapper
  2. @Repository
  3. public interface ProductDao {
  4. /**
  5. * 获取库存
  6. *
  7. * @param productId 商品编号
  8. * @return 库存
  9. */
  10. @Select("SELECT stock FROM product WHERE id = #{productId}")
  11. Integer getStock(@Param("productId") Long productId);
  12. /**
  13. * 扣减库存
  14. *
  15. * @param productId 商品编号
  16. * @param amount 扣减数量
  17. * @return 影响记录行数
  18. */
  19. @Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}")
  20. int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount);
  21. }

3.3.7 ProductServiceApplication

创建 ProductServiceApplication 类,用于启动商品服务。代码如下:

  1. @SpringBootApplication
  2. public class ProductServiceApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(ProductServiceApplication.class, args);
  5. }
  6. }

3.4 账户服务

新建 lab-52-seata-at-httpclient-demo-account-service 项目,作为账户服务。它主要提供 /account/reduce-balance 接口,实现扣除账户的余额逻辑。
友情提示:整体内容和「2.5 账户模块」一致。

3.4.1 引入依赖

创建 pom.xml 文件,引入相关的依赖。和「3.2.1 引入依赖」是一致的,就不重复“贴”出来了,胖友点击 pom.xml 文件查看。

3.4.2 配置文件

创建 application.yaml 配置文件,添加相关的配置项。和「3.2.2 配置文件」是一致的,就不重复“贴”出来了,胖友点击 application.yaml 文件查看。

3.4.3 AccountController

创建 AccountController 类,提供 /account/reduce-balance 扣除余额 HTTP API。代码如下:

  1. @RestController
  2. @RequestMapping("/account")
  3. public class AccountController {
  4. private Logger logger = LoggerFactory.getLogger(AccountController.class);
  5. @Autowired
  6. private AccountService accountService;
  7. @PostMapping("/reduce-balance")
  8. public Boolean reduceBalance(@RequestBody AccountReduceBalanceDTO accountReduceBalanceDTO) {
  9. logger.info("[reduceBalance] 收到减少余额请求, 用户:{}, 金额:{}", accountReduceBalanceDTO.getUserId(),
  10. accountReduceBalanceDTO.getPrice());
  11. try {
  12. accountService.reduceBalance(accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice());
  13. // 正常扣除余额,返回 true
  14. return true;
  15. } catch (Exception e) {
  16. // 失败扣除余额,返回 false
  17. return false;
  18. }
  19. }
  20. }
  • 该 API 中,会调用 AccountService 进行扣除余额,最终通过返回结果为 true 或者 false,表示扣除余额成功或是失败。

其中,AccountReduceBalanceDTO 为账户减少余额 DTO 类,代码如下:

  1. public class AccountReduceBalanceDTO {
  2. /**
  3. * 用户编号
  4. */
  5. private Long userId;
  6. /**
  7. * 扣减金额
  8. */
  9. private Integer price;
  10. public Long getUserId() {
  11. return userId;
  12. }
  13. // ... 省略 setter/getter 方法
  14. }

3.4.4 AccountService

创建 AccountService 类,定义扣除余额的方法。代码如下:

  1. /**
  2. * 账户 Service
  3. */
  4. public interface AccountService {
  5. /**
  6. * 扣除余额
  7. *
  8. * @param userId 用户编号
  9. * @param price 扣减金额
  10. * @throws Exception 失败时抛出异常
  11. */
  12. void reduceBalance(Long userId, Integer price) throws Exception;
  13. }

3.4.5 AccountServiceImpl

创建 AccountServiceImpl 类,实现扣除余额的方法。代码如下:

  1. @Service
  2. public class AccountServiceImpl implements AccountService {
  3. private Logger logger = LoggerFactory.getLogger(getClass());
  4. @Autowired
  5. private AccountDao accountDao;
  6. @Override
  7. @Transactional(propagation = Propagation.REQUIRES_NEW) // <1> 开启新事物
  8. public void reduceBalance(Long userId, Integer price) throws Exception {
  9. logger.info("[reduceBalance] 当前 XID: {}", RootContext.getXID());
  10. // <2> 检查余额
  11. checkBalance(userId, price);
  12. logger.info("[reduceBalance] 开始扣减用户 {} 余额", userId);
  13. // <3> 扣除余额
  14. int updateCount = accountDao.reduceBalance(price);
  15. // 扣除成功
  16. if (updateCount == 0) {
  17. logger.warn("[reduceBalance] 扣除用户 {} 余额失败", userId);
  18. throw new Exception("余额不足");
  19. }
  20. logger.info("[reduceBalance] 扣除用户 {} 余额成功", userId);
  21. }
  22. private void checkBalance(Long userId, Integer price) throws Exception {
  23. logger.info("[checkBalance] 检查用户 {} 余额", userId);
  24. Integer balance = accountDao.getBalance(userId);
  25. if (balance < price) {
  26. logger.warn("[checkBalance] 用户 {} 余额不足,当前余额:{}", userId, balance);
  27. throw new Exception("余额不足");
  28. }
  29. }
  30. }

<1> 处,在类上,添加了 Spring @Transactional 注解,声明本地事务。也就是说,此处会开启一个 seata_account 库的数据库事务。

<2> 处,检查余额是否足够,如果不够则抛出 Exception 异常。因为我们需要通过异常,回滚全局异常。

<3> 处,进行扣除余额,如果扣除失败则抛出 Exception 异常。

3.4.6 AccountDao

创建 AccountDao 接口,定义查询和扣除余额的操作。代码如下:

  1. @Mapper
  2. @Repository
  3. public interface AccountDao {
  4. /**
  5. * 获取账户余额
  6. *
  7. * @param userId 用户 ID
  8. * @return 账户余额
  9. */
  10. @Select("SELECT balance FROM account WHERE id = #{userId}")
  11. Integer getBalance(@Param("userId") Long userId);
  12. /**
  13. * 扣减余额
  14. *
  15. * @param price 需要扣减的数目
  16. * @return 影响记录行数
  17. */
  18. @Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}")
  19. int reduceBalance(@Param("price") Integer price);
  20. }

3.4.7 AccountServiceApplication

创建 AccountServiceApplication 类,用于启动商品服务。代码如下:

  1. @SpringBootApplication
  2. public class AccountServiceApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(AccountServiceApplication.class, args);
  5. }
  6. }

3.5 简单测试

下面,我们将测试两种情况:

  1. 分布式事务正常提交
  2. 分布式事务异常回滚

Debug 执行 OrderServiceApplication 启动订单服务。此时,我们可以看到 Seata 相关日志如下:
友情提示:日志的顺序,艿艿做了简单的整理,为了更容易阅读。

  1. # ... 上面还有 Seata 相关 Bean 初始化的日志,忘记加进来了,嘿嘿~
  2. # 给数据源增加 Seata 的数据源代理
  3. 2020-04-05 10:51:00.687 INFO 52124 --- [ main] s.s.a.d.SeataDataSourceBeanPostProcessor : Auto proxy of [dataSource]
  4. 2020-04-05 10:51:00.688 INFO 52124 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
  5. 2020-04-05 10:51:00.833 INFO 52124 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
  6. # 加载 Druid 提供的 SQL 解析器
  7. 2020-04-05 10:51:00.881 INFO 52124 --- [ main] i.s.common.loader.EnhancedServiceLoader : load DbTypeParser[druid] extension by class[io.seata.sqlparser.druid.DruidDelegatingDbTypeParser]
  8. # 连接到 Seata TC Server 服务器
  9. 2020-04-05 10:51:00.892 INFO 52124 --- [ main] i.s.c.r.netty.NettyClientChannelManager : will connect to 127.0.0.1:8091
  10. 2020-04-05 10:51:00.893 INFO 52124 --- [ main] i.s.core.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:RMROLE,address:127.0.0.1:8091,msg:< RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata_order', applicationId='order-service', transactionServiceGroup='order-service-group'} >
  11. # 加载 Seata 序列化器
  12. 2020-04-05 10:51:01.042 INFO 52124 --- [lector_RMROLE_1] i.s.common.loader.EnhancedServiceLoader : load Serializer[SEATA] extension by class[io.seata.serializer.seata.SeataSerializer]
  13. # 注册 Seata Resource Manager 到 Seata TC Server 成功
  14. 2020-04-05 10:51:00.892 INFO 52124 --- [ main] io.seata.core.rpc.netty.RmRpcClient : RM will register :jdbc:mysql://127.0.0.1:3306/seata_order
  15. 2020-04-05 10:51:01.054 INFO 52124 --- [ main] io.seata.core.rpc.netty.RmRpcClient : register RM success. server version:1.1.0,channel:[id: 0xc7553923, L:/127.0.0.1:64449 - R:/127.0.0.1:8091]
  16. 2020-04-05 10:51:01.061 INFO 52124 --- [ main] i.s.core.rpc.netty.NettyPoolableFactory : register success, cost 34 ms, version:1.1.0,role:RMROLE,channel:[id: 0xc7553923, L:/127.0.0.1:64449 - R:/127.0.0.1:8091]
  17. # 因为 OrderServiceImpl 添加了 `@GlobalTransactional` 注解,所以创建其代理,用于全局事务。
  18. 2020-04-05 10:51:01.157 INFO 52124 --- [ main] i.s.s.a.GlobalTransactionScanner : Bean[cn.iocoder.springboot.lab52.orderservice.service.OrderServiceImpl] with name [orderServiceImpl] would use interceptor [io.seata.spring.annotation.GlobalTransactionalInterceptor]

执行 ProductServiceApplication 启动商品服务。相关的日志,胖友自己瞅瞅。
执行 AccountServiceApplication 启动账户服务。相关的日志,胖友自己瞅瞅。

3.5.1 正常流程

① 先查询下目前数据库的数据情况。如下图所示:

② 使用 Postman 模拟调用 http://127.0.0.1:8081/order/create 创建订单的接口,如下图所示:

此时,在控制台打印日志如下图所示:

  • 订单服务:

  • 商品服务:

  • 账户服务:

再查询下目前数据库的数据情况。如下图所示:

3.5.2 异常流程

① 先查询下目前数据库的数据情况。如下图所示:

② 在 OrderServiceImpl 的 #createOrder(...) 方法上,打上断点如下图,方便我们看到 product 表的 balance 被减少:

友情提示:这里忘记截图了,稍后 IDEA 停留在该断点时,胖友可以去查询 product 表,会发现 balance 已经减少。

③ 使用 Postman 模拟调用 http://127.0.0.1:8081/order/create 创建订单的接口,如下图所示:

此时,在控制台打印日志如下图所示:

  • 订单服务:

  • 商品服务:

  • 账户服务:

再查询下目前数据库的数据情况。如下图所示:

相关文章