Spring Cloud微服务系统基于Rocketmq可靠消息最终一致性实现分布式事务

x33g5p2x  于2021-11-01 转载在 Spring  
字(31.4k)|赞(0)|评价(0)|浏览(663)

一.订单使用事务消息,异步调用账户

订单添加事务消息

1.父项目添加spring Rocketmq依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.1.0</version>
  5. </dependency>

2.yml配置rocket连接和生产者组名

  1. rocketmq:
  2. name-server: 192.168.64.141:9876
  3. producer:
  4. group: order_producer

3.添加新的数据表tx_table,用来存储事务的执行状态

  1. drop database if exists `seata_order`;
  2. CREATE DATABASE `seata_order` charset utf8;
  3. use `seata_order`;
  4. CREATE TABLE `order` (
  5. `id` bigint(11) NOT NULL,
  6. `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
  7. `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  8. `count` int(11) DEFAULT NULL COMMENT '数量',
  9. `money` decimal(11,0) DEFAULT NULL COMMENT '金额',
  10. PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
  12. ALTER TABLE `order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' AFTER `money` ;
  13. -- for AT mode you must to init this sql for you business database. the seata server not need it.
  14. CREATE TABLE IF NOT EXISTS `undo_log`
  15. (
  16. `branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
  17. `xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
  18. `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  19. `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
  20. `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
  21. `log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
  22. `log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
  23. UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
  24. ) ENGINE = InnoDB
  25. AUTO_INCREMENT = 1
  26. DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
  27. CREATE TABLE IF NOT EXISTS segment
  28. (
  29. id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '自增主键',
  30. VERSION BIGINT DEFAULT 0 NOT NULL COMMENT '版本号',
  31. business_type VARCHAR(63) DEFAULT '' NOT NULL COMMENT '业务类型,唯一',
  32. max_id BIGINT DEFAULT 0 NOT NULL COMMENT '当前最大id',
  33. step INT DEFAULT 0 NULL COMMENT '步长',
  34. increment INT DEFAULT 1 NOT NULL COMMENT '每次id增量',
  35. remainder INT DEFAULT 0 NOT NULL COMMENT '余数',
  36. created_at BIGINT UNSIGNED NOT NULL COMMENT '创建时间',
  37. updated_at BIGINT UNSIGNED NOT NULL COMMENT '更新时间',
  38. CONSTRAINT uniq_business_type UNIQUE (business_type)
  39. ) CHARSET = utf8mb4
  40. ENGINE INNODB COMMENT '号段表';
  41. INSERT INTO segment
  42. (VERSION, business_type, max_id, step, increment, remainder, created_at, updated_at)
  43. VALUES (1, 'order_business', 1000, 1000, 1, 0, NOW(), NOW());
  44. CREATE TABLE tx_table(
  45. `xid` char(32) PRIMARY KEY COMMENT '事务id',
  46. `status` int COMMENT '0-提交,1-回滚,2-未知',
  47. `created_at` BIGINT UNSIGNED NOT NULL COMMENT '创建时间'
  48. );

4.添加实体类TxInfo和TxMapper

TxMapper.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="cn.tedu.order.mapper.TxMapper" >
  4. <resultMap id="BaseResultMap" type="cn.tedu.order.tx.TxInfo" >
  5. <id column="xid" property="xid" jdbcType="CHAR" />
  6. <result column="created_at" property="created" jdbcType="BIGINT" />
  7. <result column="status" property="status" jdbcType="INTEGER"/>
  8. </resultMap>
  9. <insert id="insert">
  10. INSERT INTO `tx_table`(`xid`,`created_at`,`status`) VALUES(#{xid},#{created},#{status});
  11. </insert>
  12. <select id="exists" resultType="boolean">
  13. SELECT COUNT(1) FROM tx_table WHERE xid=#{xid};
  14. </select>
  15. <select id="selectById" resultMap="BaseResultMap">
  16. SELECT `xid`,`created_at`,`status` FROM tx_table WHERE xid=#{xid};
  17. </select>
  18. </mapper>

5.新建AccountMessage,用来封装发给账户的调用信息:userId,money,txId

  1. package cn.tedu.order.entity;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.math.BigDecimal;
  6. @Data
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class AccountMessage {
  10. private Long userId;
  11. private BigDecimal money;
  12. private String xid;
  13. }

6.工具类:JsonUtil

  1. package cn.tedu.order.util;
  2. import java.io.File;
  3. import java.io.FileWriter;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.io.InputStreamReader;
  7. import java.io.Writer;
  8. import java.math.BigDecimal;
  9. import java.math.BigInteger;
  10. import java.net.URL;
  11. import java.nio.charset.StandardCharsets;
  12. import java.text.SimpleDateFormat;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import org.apache.commons.lang3.StringUtils;
  16. import com.fasterxml.jackson.annotation.JsonInclude;
  17. import com.fasterxml.jackson.core.JsonGenerator;
  18. import com.fasterxml.jackson.core.JsonParser;
  19. import com.fasterxml.jackson.core.JsonProcessingException;
  20. import com.fasterxml.jackson.core.type.TypeReference;
  21. import com.fasterxml.jackson.databind.DeserializationFeature;
  22. import com.fasterxml.jackson.databind.JsonNode;
  23. import com.fasterxml.jackson.databind.ObjectMapper;
  24. import com.fasterxml.jackson.databind.SerializationFeature;
  25. import com.fasterxml.jackson.databind.node.ObjectNode;
  26. import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
  27. import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
  28. import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
  29. import lombok.extern.slf4j.Slf4j;
  30. @Slf4j
  31. public class JsonUtil {
  32. private static ObjectMapper mapper;
  33. private static JsonInclude.Include DEFAULT_PROPERTY_INCLUSION = JsonInclude.Include.NON_DEFAULT;
  34. private static boolean IS_ENABLE_INDENT_OUTPUT = false;
  35. private static String CSV_DEFAULT_COLUMN_SEPARATOR = ",";
  36. static {
  37. try {
  38. initMapper();
  39. configPropertyInclusion();
  40. configIndentOutput();
  41. configCommon();
  42. } catch (Exception e) {
  43. log.error("jackson config error", e);
  44. }
  45. }
  46. private static void initMapper() {
  47. mapper = new ObjectMapper();
  48. }
  49. private static void configCommon() {
  50. config(mapper);
  51. }
  52. private static void configPropertyInclusion() {
  53. mapper.setSerializationInclusion(DEFAULT_PROPERTY_INCLUSION);
  54. }
  55. private static void configIndentOutput() {
  56. mapper.configure(SerializationFeature.INDENT_OUTPUT, IS_ENABLE_INDENT_OUTPUT);
  57. }
  58. private static void config(ObjectMapper objectMapper) {
  59. objectMapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
  60. objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
  61. objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
  62. objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
  63. objectMapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
  64. objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
  65. objectMapper.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
  66. objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
  67. objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
  68. objectMapper.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);
  69. objectMapper.enable(JsonGenerator.Feature.IGNORE_UNKNOWN);
  70. objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
  71. objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
  72. objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
  73. objectMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
  74. objectMapper.registerModule(new ParameterNamesModule());
  75. objectMapper.registerModule(new Jdk8Module());
  76. objectMapper.registerModule(new JavaTimeModule());
  77. }
  78. public static void setSerializationInclusion(JsonInclude.Include inclusion) {
  79. DEFAULT_PROPERTY_INCLUSION = inclusion;
  80. configPropertyInclusion();
  81. }
  82. public static void setIndentOutput(boolean isEnable) {
  83. IS_ENABLE_INDENT_OUTPUT = isEnable;
  84. configIndentOutput();
  85. }
  86. public static <V> V from(URL url, Class<V> c) {
  87. try {
  88. return mapper.readValue(url, c);
  89. } catch (IOException e) {
  90. log.error("jackson from error, url: {}, type: {}", url.getPath(), c, e);
  91. return null;
  92. }
  93. }
  94. public static <V> V from(InputStream inputStream, Class<V> c) {
  95. try {
  96. return mapper.readValue(inputStream, c);
  97. } catch (IOException e) {
  98. log.error("jackson from error, type: {}", c, e);
  99. return null;
  100. }
  101. }
  102. public static <V> V from(File file, Class<V> c) {
  103. try {
  104. return mapper.readValue(file, c);
  105. } catch (IOException e) {
  106. log.error("jackson from error, file path: {}, type: {}", file.getPath(), c, e);
  107. return null;
  108. }
  109. }
  110. public static <V> V from(Object jsonObj, Class<V> c) {
  111. try {
  112. return mapper.readValue(jsonObj.toString(), c);
  113. } catch (IOException e) {
  114. log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), c, e);
  115. return null;
  116. }
  117. }
  118. public static <V> V from(String json, Class<V> c) {
  119. try {
  120. return mapper.readValue(json, c);
  121. } catch (IOException e) {
  122. log.error("jackson from error, json: {}, type: {}", json, c, e);
  123. return null;
  124. }
  125. }
  126. public static <V> V from(URL url, TypeReference<V> type) {
  127. try {
  128. return mapper.readValue(url, type);
  129. } catch (IOException e) {
  130. log.error("jackson from error, url: {}, type: {}", url.getPath(), type, e);
  131. return null;
  132. }
  133. }
  134. public static <V> V from(InputStream inputStream, TypeReference<V> type) {
  135. try {
  136. return mapper.readValue(inputStream, type);
  137. } catch (IOException e) {
  138. log.error("jackson from error, type: {}", type, e);
  139. return null;
  140. }
  141. }
  142. public static <V> V from(File file, TypeReference<V> type) {
  143. try {
  144. return mapper.readValue(file, type);
  145. } catch (IOException e) {
  146. log.error("jackson from error, file path: {}, type: {}", file.getPath(), type, e);
  147. return null;
  148. }
  149. }
  150. public static <V> V from(Object jsonObj, TypeReference<V> type) {
  151. try {
  152. return mapper.readValue(jsonObj.toString(), type);
  153. } catch (IOException e) {
  154. log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), type, e);
  155. return null;
  156. }
  157. }
  158. public static <V> V from(String json, TypeReference<V> type) {
  159. try {
  160. return mapper.readValue(json, type);
  161. } catch (IOException e) {
  162. log.error("jackson from error, json: {}, type: {}", json, type, e);
  163. return null;
  164. }
  165. }
  166. public static <V> String to(List<V> list) {
  167. try {
  168. return mapper.writeValueAsString(list);
  169. } catch (JsonProcessingException e) {
  170. log.error("jackson to error, obj: {}", list, e);
  171. return null;
  172. }
  173. }
  174. public static <V> String to(V v) {
  175. try {
  176. return mapper.writeValueAsString(v);
  177. } catch (JsonProcessingException e) {
  178. log.error("jackson to error, obj: {}", v, e);
  179. return null;
  180. }
  181. }
  182. public static <V> void toFile(String path, List<V> list) {
  183. try (Writer writer = new FileWriter(new File(path), true)) {
  184. mapper.writer().writeValues(writer).writeAll(list);
  185. writer.flush();
  186. } catch (Exception e) {
  187. log.error("jackson to file error, path: {}, list: {}", path, list, e);
  188. }
  189. }
  190. public static <V> void toFile(String path, V v) {
  191. try (Writer writer = new FileWriter(new File(path), true)) {
  192. mapper.writer().writeValues(writer).write(v);
  193. writer.flush();
  194. } catch (Exception e) {
  195. log.error("jackson to file error, path: {}, obj: {}", path, v, e);
  196. }
  197. }
  198. public static String getString(String json, String key) {
  199. if (StringUtils.isEmpty(json)) {
  200. return null;
  201. }
  202. try {
  203. JsonNode node = mapper.readTree(json);
  204. if (null != node) {
  205. return node.get(key).asText();
  206. } else {
  207. return null;
  208. }
  209. } catch (IOException e) {
  210. log.error("jackson get string error, json: {}, key: {}", json, key, e);
  211. return null;
  212. }
  213. }
  214. public static Integer getInt(String json, String key) {
  215. if (StringUtils.isEmpty(json)) {
  216. return null;
  217. }
  218. try {
  219. JsonNode node = mapper.readTree(json);
  220. if (null != node) {
  221. return node.get(key).intValue();
  222. } else {
  223. return null;
  224. }
  225. } catch (IOException e) {
  226. log.error("jackson get int error, json: {}, key: {}", json, key, e);
  227. return null;
  228. }
  229. }
  230. public static Long getLong(String json, String key) {
  231. if (StringUtils.isEmpty(json)) {
  232. return null;
  233. }
  234. try {
  235. JsonNode node = mapper.readTree(json);
  236. if (null != node) {
  237. return node.get(key).longValue();
  238. } else {
  239. return null;
  240. }
  241. } catch (IOException e) {
  242. log.error("jackson get long error, json: {}, key: {}", json, key, e);
  243. return null;
  244. }
  245. }
  246. public static Double getDouble(String json, String key) {
  247. if (StringUtils.isEmpty(json)) {
  248. return null;
  249. }
  250. try {
  251. JsonNode node = mapper.readTree(json);
  252. if (null != node) {
  253. return node.get(key).doubleValue();
  254. } else {
  255. return null;
  256. }
  257. } catch (IOException e) {
  258. log.error("jackson get double error, json: {}, key: {}", json, key, e);
  259. return null;
  260. }
  261. }
  262. public static BigInteger getBigInteger(String json, String key) {
  263. if (StringUtils.isEmpty(json)) {
  264. return new BigInteger(String.valueOf(0.00));
  265. }
  266. try {
  267. JsonNode node = mapper.readTree(json);
  268. if (null != node) {
  269. return node.get(key).bigIntegerValue();
  270. } else {
  271. return null;
  272. }
  273. } catch (IOException e) {
  274. log.error("jackson get biginteger error, json: {}, key: {}", json, key, e);
  275. return null;
  276. }
  277. }
  278. public static BigDecimal getBigDecimal(String json, String key) {
  279. if (StringUtils.isEmpty(json)) {
  280. return null;
  281. }
  282. try {
  283. JsonNode node = mapper.readTree(json);
  284. if (null != node) {
  285. return node.get(key).decimalValue();
  286. } else {
  287. return null;
  288. }
  289. } catch (IOException e) {
  290. log.error("jackson get bigdecimal error, json: {}, key: {}", json, key, e);
  291. return null;
  292. }
  293. }
  294. public static boolean getBoolean(String json, String key) {
  295. if (StringUtils.isEmpty(json)) {
  296. return false;
  297. }
  298. try {
  299. JsonNode node = mapper.readTree(json);
  300. if (null != node) {
  301. return node.get(key).booleanValue();
  302. } else {
  303. return false;
  304. }
  305. } catch (IOException e) {
  306. log.error("jackson get boolean error, json: {}, key: {}", json, key, e);
  307. return false;
  308. }
  309. }
  310. public static byte[] getByte(String json, String key) {
  311. if (StringUtils.isEmpty(json)) {
  312. return null;
  313. }
  314. try {
  315. JsonNode node = mapper.readTree(json);
  316. if (null != node) {
  317. return node.get(key).binaryValue();
  318. } else {
  319. return null;
  320. }
  321. } catch (IOException e) {
  322. log.error("jackson get byte error, json: {}, key: {}", json, key, e);
  323. return null;
  324. }
  325. }
  326. public static <T> ArrayList<T> getList(String json, String key) {
  327. if (StringUtils.isEmpty(json)) {
  328. return null;
  329. }
  330. String string = getString(json, key);
  331. return from(string, new TypeReference<ArrayList<T>>() {});
  332. }
  333. public static <T> String add(String json, String key, T value) {
  334. try {
  335. JsonNode node = mapper.readTree(json);
  336. add(node, key, value);
  337. return node.toString();
  338. } catch (IOException e) {
  339. log.error("jackson add error, json: {}, key: {}, value: {}", json, key, value, e);
  340. return json;
  341. }
  342. }
  343. private static <T> void add(JsonNode jsonNode, String key, T value) {
  344. if (value instanceof String) {
  345. ((ObjectNode) jsonNode).put(key, (String) value);
  346. } else if (value instanceof Short) {
  347. ((ObjectNode) jsonNode).put(key, (Short) value);
  348. } else if (value instanceof Integer) {
  349. ((ObjectNode) jsonNode).put(key, (Integer) value);
  350. } else if (value instanceof Long) {
  351. ((ObjectNode) jsonNode).put(key, (Long) value);
  352. } else if (value instanceof Float) {
  353. ((ObjectNode) jsonNode).put(key, (Float) value);
  354. } else if (value instanceof Double) {
  355. ((ObjectNode) jsonNode).put(key, (Double) value);
  356. } else if (value instanceof BigDecimal) {
  357. ((ObjectNode) jsonNode).put(key, (BigDecimal) value);
  358. } else if (value instanceof BigInteger) {
  359. ((ObjectNode) jsonNode).put(key, (BigInteger) value);
  360. } else if (value instanceof Boolean) {
  361. ((ObjectNode) jsonNode).put(key, (Boolean) value);
  362. } else if (value instanceof byte[]) {
  363. ((ObjectNode) jsonNode).put(key, (byte[]) value);
  364. } else {
  365. ((ObjectNode) jsonNode).put(key, to(value));
  366. }
  367. }
  368. public static String remove(String json, String key) {
  369. try {
  370. JsonNode node = mapper.readTree(json);
  371. ((ObjectNode) node).remove(key);
  372. return node.toString();
  373. } catch (IOException e) {
  374. log.error("jackson remove error, json: {}, key: {}", json, key, e);
  375. return json;
  376. }
  377. }
  378. public static <T> String update(String json, String key, T value) {
  379. try {
  380. JsonNode node = mapper.readTree(json);
  381. ((ObjectNode) node).remove(key);
  382. add(node, key, value);
  383. return node.toString();
  384. } catch (IOException e) {
  385. log.error("jackson update error, json: {}, key: {}, value: {}", json, key, value, e);
  386. return json;
  387. }
  388. }
  389. public static String format(String json) {
  390. try {
  391. JsonNode node = mapper.readTree(json);
  392. return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node);
  393. } catch (IOException e) {
  394. log.error("jackson format json error, json: {}", json, e);
  395. return json;
  396. }
  397. }
  398. public static boolean isJson(String json) {
  399. try {
  400. mapper.readTree(json);
  401. return true;
  402. } catch (Exception e) {
  403. log.error("jackson check json error, json: {}", json, e);
  404. return false;
  405. }
  406. }
  407. private static InputStream getResourceStream(String name) {
  408. return JsonUtil.class.getClassLoader().getResourceAsStream(name);
  409. }
  410. private static InputStreamReader getResourceReader(InputStream inputStream) {
  411. if (null == inputStream) {
  412. return null;
  413. }
  414. return new InputStreamReader(inputStream, StandardCharsets.UTF_8);
  415. }
  416. }

7.OrderServiceImpl发送事务消息

  1. package cn.tedu.order.service;
  2. import cn.tedu.order.entity.AccountMessage;
  3. import cn.tedu.order.entity.Order;
  4. import cn.tedu.order.entity.TxInfo;
  5. import cn.tedu.order.fegin.AccountClient;
  6. import cn.tedu.order.fegin.EasyIdClient;
  7. import cn.tedu.order.fegin.StorageClient;
  8. import cn.tedu.order.mapper.OrderMapper;
  9. import cn.tedu.order.mapper.TxMapper;
  10. import cn.tedu.order.util.JsonUtil;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
  13. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
  14. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
  15. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.messaging.Message;
  18. import org.springframework.messaging.support.MessageBuilder;
  19. import org.springframework.stereotype.Service;
  20. import org.springframework.transaction.annotation.Transactional;
  21. import java.util.Random;
  22. import java.util.UUID;
  23. @Slf4j
  24. @Service
  25. @RocketMQTransactionListener
  26. public class OrderServiceImpl
  27. implements OrderService, RocketMQLocalTransactionListener {
  28. @Autowired
  29. private OrderMapper orderMapper;
  30. @Autowired
  31. private EasyIdClient easyIdClient;
  32. @Autowired
  33. private AccountClient accountClient;
  34. @Autowired
  35. private StorageClient storageClient;
  36. @Autowired
  37. private RocketMQTemplate t;
  38. @Autowired
  39. private TxMapper txMapper;
  40. // 业务方法,不直接完成业务,而是发送事务消息
  41. // 通过发送事务消息,会触发监听器执行业务
  42. @Override
  43. public void create(Order order) {
  44. // 准备消息数据
  45. String xid = UUID.randomUUID().toString().replace("-", "");
  46. AccountMessage am = new AccountMessage(order.getUserId(),order.getMoney(),xid);
  47. String json = JsonUtil.to(am);
  48. // 把 json 字符串,封装到spring 的通用 Message 对象
  49. Message<String> msg = MessageBuilder.withPayload(json).build();
  50. // 发送事务消息
  51. // 如果有标签: orderTopic:TagA
  52. // t.sendMessageInTransaction("orderTopic", msg, 触发监听器执行业务时需要的业务数据参数);
  53. t.sendMessageInTransaction("orderTopic", msg, order);
  54. }
  55. public void doCreate(Order order) {
  56. // 远程调用发号器,生成订单id
  57. String s = easyIdClient.nextId("order_business");
  58. Long id = Long.valueOf(s);
  59. order.setId(id);
  60. orderMapper.create(order);
  61. }
  62. // 执行本地事务
  63. @Transactional
  64. @Override
  65. public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
  66. RocketMQLocalTransactionState state; // 用来返回状态
  67. Integer status; // 用来在数据库中保存状态
  68. try {
  69. doCreate((Order) o);
  70. state = RocketMQLocalTransactionState.COMMIT;
  71. status = 0;
  72. } catch (Exception e) {
  73. log.error("创建订单失败", e);
  74. state = RocketMQLocalTransactionState.ROLLBACK;
  75. status = 1;
  76. }
  77. // message - {userId:6, money:20, xid:j5hg345ytg}
  78. String json = new String((byte[]) message.getPayload());
  79. String xid = JsonUtil.getString(json, "xid");
  80. txMapper.insert(new TxInfo(xid, status, System.currentTimeMillis()));
  81. return state;
  82. }
  83. // 处理事务状态回查
  84. @Override
  85. public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
  86. String json = new String((byte[]) message.getPayload());
  87. String xid = JsonUtil.getString(json, "xid");
  88. TxInfo txInfo = txMapper.selectById(xid);
  89. if (txInfo == null) {
  90. return RocketMQLocalTransactionState.UNKNOWN;
  91. }
  92. switch (txInfo.getStatus()) {
  93. case 0: return RocketMQLocalTransactionState.COMMIT;
  94. case 1: return RocketMQLocalTransactionState.ROLLBACK;
  95. default: return RocketMQLocalTransactionState.UNKNOWN;
  96. }
  97. }
  98. }

8.实现事务监听器

TxListener 事务监听器
发送事务消息后会触发事务监听器执行。

事务监听器有两个方法:

  • executeLocalTransaction(): 执行本地事务
  • checkLocalTransaction(): 负责响应Rocketmq服务器的事务回查操作

9.测试

按顺序启动项目:

1.Eureka
2.Easy Id Generator
3.Order
调用保存订单,地址:

观察控制台日志:

订单表:

事务表:

二.账户接收消息,扣减账户

1.依赖

2.yml配置name server

  1. rocketmq:
  2. name-server: 192.168.64.141:9876

3.AccountMessage

  1. package cn.tedu.account.entity;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.math.BigDecimal;
  6. @Data
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class AccountMessage {
  10. private Long userId;
  11. private BigDecimal money;
  12. private String xid;
  13. }

4.jsonUtil

  1. package cn.tedu.account.util;
  2. import com.fasterxml.jackson.annotation.JsonInclude;
  3. import com.fasterxml.jackson.core.JsonGenerator;
  4. import com.fasterxml.jackson.core.JsonParser;
  5. import com.fasterxml.jackson.core.JsonProcessingException;
  6. import com.fasterxml.jackson.core.type.TypeReference;
  7. import com.fasterxml.jackson.databind.DeserializationFeature;
  8. import com.fasterxml.jackson.databind.JsonNode;
  9. import com.fasterxml.jackson.databind.ObjectMapper;
  10. import com.fasterxml.jackson.databind.SerializationFeature;
  11. import com.fasterxml.jackson.databind.node.ObjectNode;
  12. import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
  13. import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
  14. import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.apache.commons.lang3.StringUtils;
  17. import java.io.*;
  18. import java.math.BigDecimal;
  19. import java.math.BigInteger;
  20. import java.net.URL;
  21. import java.nio.charset.StandardCharsets;
  22. import java.text.SimpleDateFormat;
  23. import java.util.ArrayList;
  24. import java.util.List;
  25. @Slf4j
  26. public class JsonUtil {
  27. private static ObjectMapper mapper;
  28. private static JsonInclude.Include DEFAULT_PROPERTY_INCLUSION = JsonInclude.Include.NON_DEFAULT;
  29. private static boolean IS_ENABLE_INDENT_OUTPUT = false;
  30. private static String CSV_DEFAULT_COLUMN_SEPARATOR = ",";
  31. static {
  32. try {
  33. initMapper();
  34. configPropertyInclusion();
  35. configIndentOutput();
  36. configCommon();
  37. } catch (Exception e) {
  38. log.error("jackson config error", e);
  39. }
  40. }
  41. private static void initMapper() {
  42. mapper = new ObjectMapper();
  43. }
  44. private static void configCommon() {
  45. config(mapper);
  46. }
  47. private static void configPropertyInclusion() {
  48. mapper.setSerializationInclusion(DEFAULT_PROPERTY_INCLUSION);
  49. }
  50. private static void configIndentOutput() {
  51. mapper.configure(SerializationFeature.INDENT_OUTPUT, IS_ENABLE_INDENT_OUTPUT);
  52. }
  53. private static void config(ObjectMapper objectMapper) {
  54. objectMapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
  55. objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
  56. objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
  57. objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
  58. objectMapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
  59. objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
  60. objectMapper.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
  61. objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
  62. objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
  63. objectMapper.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);
  64. objectMapper.enable(JsonGenerator.Feature.IGNORE_UNKNOWN);
  65. objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
  66. objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
  67. objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
  68. objectMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
  69. objectMapper.registerModule(new ParameterNamesModule());
  70. objectMapper.registerModule(new Jdk8Module());
  71. objectMapper.registerModule(new JavaTimeModule());
  72. }
  73. public static void setSerializationInclusion(JsonInclude.Include inclusion) {
  74. DEFAULT_PROPERTY_INCLUSION = inclusion;
  75. configPropertyInclusion();
  76. }
  77. public static void setIndentOutput(boolean isEnable) {
  78. IS_ENABLE_INDENT_OUTPUT = isEnable;
  79. configIndentOutput();
  80. }
  81. public static <V> V from(URL url, Class<V> c) {
  82. try {
  83. return mapper.readValue(url, c);
  84. } catch (IOException e) {
  85. log.error("jackson from error, url: {}, type: {}", url.getPath(), c, e);
  86. return null;
  87. }
  88. }
  89. public static <V> V from(InputStream inputStream, Class<V> c) {
  90. try {
  91. return mapper.readValue(inputStream, c);
  92. } catch (IOException e) {
  93. log.error("jackson from error, type: {}", c, e);
  94. return null;
  95. }
  96. }
  97. public static <V> V from(File file, Class<V> c) {
  98. try {
  99. return mapper.readValue(file, c);
  100. } catch (IOException e) {
  101. log.error("jackson from error, file path: {}, type: {}", file.getPath(), c, e);
  102. return null;
  103. }
  104. }
  105. public static <V> V from(Object jsonObj, Class<V> c) {
  106. try {
  107. return mapper.readValue(jsonObj.toString(), c);
  108. } catch (IOException e) {
  109. log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), c, e);
  110. return null;
  111. }
  112. }
  113. public static <V> V from(String json, Class<V> c) {
  114. try {
  115. return mapper.readValue(json, c);
  116. } catch (IOException e) {
  117. log.error("jackson from error, json: {}, type: {}", json, c, e);
  118. return null;
  119. }
  120. }
  121. public static <V> V from(URL url, TypeReference<V> type) {
  122. try {
  123. return mapper.readValue(url, type);
  124. } catch (IOException e) {
  125. log.error("jackson from error, url: {}, type: {}", url.getPath(), type, e);
  126. return null;
  127. }
  128. }
  129. public static <V> V from(InputStream inputStream, TypeReference<V> type) {
  130. try {
  131. return mapper.readValue(inputStream, type);
  132. } catch (IOException e) {
  133. log.error("jackson from error, type: {}", type, e);
  134. return null;
  135. }
  136. }
  137. public static <V> V from(File file, TypeReference<V> type) {
  138. try {
  139. return mapper.readValue(file, type);
  140. } catch (IOException e) {
  141. log.error("jackson from error, file path: {}, type: {}", file.getPath(), type, e);
  142. return null;
  143. }
  144. }
  145. public static <V> V from(Object jsonObj, TypeReference<V> type) {
  146. try {
  147. return mapper.readValue(jsonObj.toString(), type);
  148. } catch (IOException e) {
  149. log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), type, e);
  150. return null;
  151. }
  152. }
  153. public static <V> V from(String json, TypeReference<V> type) {
  154. try {
  155. return mapper.readValue(json, type);
  156. } catch (IOException e) {
  157. log.error("jackson from error, json: {}, type: {}", json, type, e);
  158. return null;
  159. }
  160. }
  161. public static <V> String to(List<V> list) {
  162. try {
  163. return mapper.writeValueAsString(list);
  164. } catch (JsonProcessingException e) {
  165. log.error("jackson to error, obj: {}", list, e);
  166. return null;
  167. }
  168. }
  169. public static <V> String to(V v) {
  170. try {
  171. return mapper.writeValueAsString(v);
  172. } catch (JsonProcessingException e) {
  173. log.error("jackson to error, obj: {}", v, e);
  174. return null;
  175. }
  176. }
  177. public static <V> void toFile(String path, List<V> list) {
  178. try (Writer writer = new FileWriter(new File(path), true)) {
  179. mapper.writer().writeValues(writer).writeAll(list);
  180. writer.flush();
  181. } catch (Exception e) {
  182. log.error("jackson to file error, path: {}, list: {}", path, list, e);
  183. }
  184. }
  185. public static <V> void toFile(String path, V v) {
  186. try (Writer writer = new FileWriter(new File(path), true)) {
  187. mapper.writer().writeValues(writer).write(v);
  188. writer.flush();
  189. } catch (Exception e) {
  190. log.error("jackson to file error, path: {}, obj: {}", path, v, e);
  191. }
  192. }
  193. public static String getString(String json, String key) {
  194. if (StringUtils.isEmpty(json)) {
  195. return null;
  196. }
  197. try {
  198. JsonNode node = mapper.readTree(json);
  199. if (null != node) {
  200. return node.get(key).asText();
  201. } else {
  202. return null;
  203. }
  204. } catch (IOException e) {
  205. log.error("jackson get string error, json: {}, key: {}", json, key, e);
  206. return null;
  207. }
  208. }
  209. public static Integer getInt(String json, String key) {
  210. if (StringUtils.isEmpty(json)) {
  211. return null;
  212. }
  213. try {
  214. JsonNode node = mapper.readTree(json);
  215. if (null != node) {
  216. return node.get(key).intValue();
  217. } else {
  218. return null;
  219. }
  220. } catch (IOException e) {
  221. log.error("jackson get int error, json: {}, key: {}", json, key, e);
  222. return null;
  223. }
  224. }
  225. public static Long getLong(String json, String key) {
  226. if (StringUtils.isEmpty(json)) {
  227. return null;
  228. }
  229. try {
  230. JsonNode node = mapper.readTree(json);
  231. if (null != node) {
  232. return node.get(key).longValue();
  233. } else {
  234. return null;
  235. }
  236. } catch (IOException e) {
  237. log.error("jackson get long error, json: {}, key: {}", json, key, e);
  238. return null;
  239. }
  240. }
  241. public static Double getDouble(String json, String key) {
  242. if (StringUtils.isEmpty(json)) {
  243. return null;
  244. }
  245. try {
  246. JsonNode node = mapper.readTree(json);
  247. if (null != node) {
  248. return node.get(key).doubleValue();
  249. } else {
  250. return null;
  251. }
  252. } catch (IOException e) {
  253. log.error("jackson get double error, json: {}, key: {}", json, key, e);
  254. return null;
  255. }
  256. }
  257. public static BigInteger getBigInteger(String json, String key) {
  258. if (StringUtils.isEmpty(json)) {
  259. return new BigInteger(String.valueOf(0.00));
  260. }
  261. try {
  262. JsonNode node = mapper.readTree(json);
  263. if (null != node) {
  264. return node.get(key).bigIntegerValue();
  265. } else {
  266. return null;
  267. }
  268. } catch (IOException e) {
  269. log.error("jackson get biginteger error, json: {}, key: {}", json, key, e);
  270. return null;
  271. }
  272. }
  273. public static BigDecimal getBigDecimal(String json, String key) {
  274. if (StringUtils.isEmpty(json)) {
  275. return null;
  276. }
  277. try {
  278. JsonNode node = mapper.readTree(json);
  279. if (null != node) {
  280. return node.get(key).decimalValue();
  281. } else {
  282. return null;
  283. }
  284. } catch (IOException e) {
  285. log.error("jackson get bigdecimal error, json: {}, key: {}", json, key, e);
  286. return null;
  287. }
  288. }
  289. public static boolean getBoolean(String json, String key) {
  290. if (StringUtils.isEmpty(json)) {
  291. return false;
  292. }
  293. try {
  294. JsonNode node = mapper.readTree(json);
  295. if (null != node) {
  296. return node.get(key).booleanValue();
  297. } else {
  298. return false;
  299. }
  300. } catch (IOException e) {
  301. log.error("jackson get boolean error, json: {}, key: {}", json, key, e);
  302. return false;
  303. }
  304. }
  305. public static byte[] getByte(String json, String key) {
  306. if (StringUtils.isEmpty(json)) {
  307. return null;
  308. }
  309. try {
  310. JsonNode node = mapper.readTree(json);
  311. if (null != node) {
  312. return node.get(key).binaryValue();
  313. } else {
  314. return null;
  315. }
  316. } catch (IOException e) {
  317. log.error("jackson get byte error, json: {}, key: {}", json, key, e);
  318. return null;
  319. }
  320. }
  321. public static <T> ArrayList<T> getList(String json, String key) {
  322. if (StringUtils.isEmpty(json)) {
  323. return null;
  324. }
  325. String string = getString(json, key);
  326. return from(string, new TypeReference<ArrayList<T>>() {});
  327. }
  328. public static <T> String add(String json, String key, T value) {
  329. try {
  330. JsonNode node = mapper.readTree(json);
  331. add(node, key, value);
  332. return node.toString();
  333. } catch (IOException e) {
  334. log.error("jackson add error, json: {}, key: {}, value: {}", json, key, value, e);
  335. return json;
  336. }
  337. }
  338. private static <T> void add(JsonNode jsonNode, String key, T value) {
  339. if (value instanceof String) {
  340. ((ObjectNode) jsonNode).put(key, (String) value);
  341. } else if (value instanceof Short) {
  342. ((ObjectNode) jsonNode).put(key, (Short) value);
  343. } else if (value instanceof Integer) {
  344. ((ObjectNode) jsonNode).put(key, (Integer) value);
  345. } else if (value instanceof Long) {
  346. ((ObjectNode) jsonNode).put(key, (Long) value);
  347. } else if (value instanceof Float) {
  348. ((ObjectNode) jsonNode).put(key, (Float) value);
  349. } else if (value instanceof Double) {
  350. ((ObjectNode) jsonNode).put(key, (Double) value);
  351. } else if (value instanceof BigDecimal) {
  352. ((ObjectNode) jsonNode).put(key, (BigDecimal) value);
  353. } else if (value instanceof BigInteger) {
  354. ((ObjectNode) jsonNode).put(key, (BigInteger) value);
  355. } else if (value instanceof Boolean) {
  356. ((ObjectNode) jsonNode).put(key, (Boolean) value);
  357. } else if (value instanceof byte[]) {
  358. ((ObjectNode) jsonNode).put(key, (byte[]) value);
  359. } else {
  360. ((ObjectNode) jsonNode).put(key, to(value));
  361. }
  362. }
  363. public static String remove(String json, String key) {
  364. try {
  365. JsonNode node = mapper.readTree(json);
  366. ((ObjectNode) node).remove(key);
  367. return node.toString();
  368. } catch (IOException e) {
  369. log.error("jackson remove error, json: {}, key: {}", json, key, e);
  370. return json;
  371. }
  372. }
  373. public static <T> String update(String json, String key, T value) {
  374. try {
  375. JsonNode node = mapper.readTree(json);
  376. ((ObjectNode) node).remove(key);
  377. add(node, key, value);
  378. return node.toString();
  379. } catch (IOException e) {
  380. log.error("jackson update error, json: {}, key: {}, value: {}", json, key, value, e);
  381. return json;
  382. }
  383. }
  384. public static String format(String json) {
  385. try {
  386. JsonNode node = mapper.readTree(json);
  387. return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node);
  388. } catch (IOException e) {
  389. log.error("jackson format json error, json: {}", json, e);
  390. return json;
  391. }
  392. }
  393. public static boolean isJson(String json) {
  394. try {
  395. mapper.readTree(json);
  396. return true;
  397. } catch (Exception e) {
  398. log.error("jackson check json error, json: {}", json, e);
  399. return false;
  400. }
  401. }
  402. private static InputStream getResourceStream(String name) {
  403. return JsonUtil.class.getClassLoader().getResourceAsStream(name);
  404. }
  405. private static InputStreamReader getResourceReader(InputStream inputStream) {
  406. if (null == inputStream) {
  407. return null;
  408. }
  409. return new InputStreamReader(inputStream, StandardCharsets.UTF_8);
  410. }
  411. }

5. 新建消费者类: AccountConsumer,实现消费者接口

  1. package cn.tedu.account.tx;
  2. import cn.tedu.account.entity.AccountMessage;
  3. import cn.tedu.account.mapper.AccountMapper;
  4. import cn.tedu.account.service.AccountService;
  5. import cn.tedu.account.util.JsonUtil;
  6. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  7. import org.apache.rocketmq.spring.core.RocketMQListener;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. @RocketMQMessageListener(topic = "orderTopic",
  12. consumerGroup = "account-consumer")
  13. public class AccountConsumer implements RocketMQListener<String> {
  14. @Autowired
  15. private AccountService accountService;
  16. @Override
  17. public void onMessage(String json) {
  18. //json--->AccountMessage
  19. AccountMessage am = JsonUtil.from(json, AccountMessage.class);
  20. accountService.decrease(am.getUserId(), am.getMoney());
  21. }
  22. }

6. 通过注解配置接收消息

  1. package cn.tedu.account.service;
  2. import cn.tedu.account.mapper.AccountMapper;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. import java.math.BigDecimal;
  6. @Service
  7. public class AccountServiceImpl implements AccountService{
  8. @Autowired
  9. private AccountMapper accountMapper;
  10. @Override
  11. public void decrease(Long userId, BigDecimal money) {
  12. accountMapper.decrease(userId, money);
  13. }

}

7. 扣减账户

TxConsumer 实现消息监听,收到消息后完成扣减金额业务:

8.启动 account 项目进行测试

按顺序启动项目:

Eureka
Easy Id Generator
Account
Order
account 项目启动时,会立即从 Rocketmq 收到消息,执行账户扣减业务:

相关文章