.NetCore中使用分布式事务DTM的二阶段消息

x33g5p2x  于9个月前 转载在 .NET  
字(5.8k)|赞(0)|评价(0)|浏览(384)

一、概述

二阶段消息是 DTM 新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、性能、便利性还是代码量都是完胜现有的方案。

相比现有的消息架构借助于各种消息中间件比如 RocketMQ 等, DTM 自己实现了无需额外的学习成本。它能够保证本地事务的提交和全局事务提交是“原子的”,适合解决不需要回滚的分布式事务场景

二阶段消息保证提交的原子性和如何保证业务成功执行如下时序图:

二阶段消息主要是指 PrepareSubmit 两个阶段,主程序向 DTM 服务发送 Prepare 消息,成功后执行本地事务,完成本地事务后发送 Submit 消息至 DTM 服务,之后 DTM 会调用分支事件执行其他服务,最后完成全局事务。

当发送了 Prepare 但是 Submit 没有提交的话,会进行回调请求来确认消息的情况,具体工作过程如下:

1、在处理本地事务时,会将 gid 插入到 barrier 表中,同时带上插入原因为 committed 。该表有一个唯一索引,主要字段为 gid

2、当进行回查时,二阶段消息的操作不是直接查 gid 是否存在,而是再 insert ignore 一条带有相同 gid 的数据,同时带上插入原因为 rollbacked 。此时如果表中如果已有 gid 的记录,那么新的插入操作就会被 ignore ,否则数据会被插入。

3、然后再用 gid 查询表中的记录,如果查到记录的 reasoncommitted ,那么说明本地事务已提交;如果查到记录的 reasonrollbacked ,那么说明本地事务已回滚。

二、安装DTM

我使用二进制包下载安装 地址 ,我是 Window 环境所以下载后解压,点击 dtm.exe 进行运行即可,如下启动成功

启动成功后可以访问 http://localhost:36789 ,进入管理后台

三、创建DTM所需的表

我们需要创建一个表处理消息的回查,表里保存全局事务ID,具体作用在后续说明,我这里用的SqlServer数据库,所以执行如下:

  1. CREATE TABLE [dbo].[barrier](
  2. [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY,
  3. [trans_type] varchar(45) NOT NULL DEFAULT(''),
  4. [gid] varchar(128) NOT NULL DEFAULT(''),
  5. [branch_id] varchar(128) NOT NULL DEFAULT(''),
  6. [op] varchar(45) NOT NULL DEFAULT(''),
  7. [barrier_id] varchar(45) NOT NULL DEFAULT(''),
  8. [reason] varchar(45) NOT NULL DEFAULT(''),
  9. [create_time] datetime NOT NULL DEFAULT(getdate()) ,
  10. [update_time] datetime NOT NULL DEFAULT(getdate())
  11. )
  12. GO
  13. CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC)
  14. WITH(IGNORE_DUP_KEY = ON)
  15. GO

这里比较关键的是那个唯一索引,有一个 IGNORE_DUP_KEY = ON ,这个其实就是为了等价 mysqlinsert ignore 表示存在相关字段的信息则不插入,否则就插入数据

当然还支持很多其他的数据库,建表语句可以从这里查看 地址

 四、创建项目

我们简单的创建两个 .net core webapi 项目进行测试,两个项目都进行相同的如下操作:

** 1、安装Dtmcli和Microsoft.EntityFrameworkCore.SqlServer**

安装 Dtmcli 是因为其中已经帮我们集成了 DTM 客户端 SDK HTTP 版本,想要 GRPC 版本可以安装 Dtmgrpc

安装 Microsoft.EntityFrameworkCore.SqlServer 很显然是为了处理数据库。

  1. Install-Package Dtmcli
  2. Install-Package Microsoft.EntityFrameworkCore.SqlServer

2、配置

接下来我们配置服务,先在配置文件 appsetting.json 中添加如下

  1. "AppSettings": {
  2. "DtmUrl": "http://localhost:36789",
  3. "BusiUrl": "http://localhost:5056",
  4. "QueryPreparedUrl": "http://localhost:5046",
  5. "BarrierConn": "Data Source=.;Initial Catalog=HTGL;TrustServerCertificate=True;;Integrated Security=True"}

DtmUrlDTM 的监听地址, http 的是 36789grpc 的是 36790

BusiUrl :访问其他服务的地址

QueryPreparedUrl :回查的地址

BarrierConn :数据库连接语句

添加一个配置类:

  1. public classAppSettings
  2. {
  3. public string DtmUrl { get; set; }
  4. public string BusiUrl { get; set; }
  5. public string BarrierConn { get; set; }
  6. public string QueryPreparedUrl { get; set; }
  7. }

之后注入服务如下:

  1. builder.Services.AddDtmcli(dtm =>{
  2. dtm.DtmUrl = builder.Configuration.GetValue<string>("AppSettings:DtmUrl");
  3. dtm.SqlDbType =DtmCommon.Constant.Barrier.DBTYPE_SQLSERVER;
  4. dtm.BarrierSqlTableName = "[HTGL].[dbo].[barrier]";
  5. });
  6. builder.Services.Configure<AppSettings>(builder.Configuration.GetSection("AppSettings"));

SqlDbType :表示使用的数据库类型

BarrierSqlTableNameBarrier 表的名字

3、添加代码

我们在其中一个项目添加主程序代码如下:

  1. [ApiController]public classDtmController : ControllerBase
  2. {
  3. private readonly ILogger<DtmController>_logger;
  4. private readonlyIDtmClient _dtmClient;
  5. private readonlyIDtmTransFactory _transFactory;
  6. private readonlyAppSettings _settings;
  7. private readonlyIBranchBarrierFactory _factory;
  8. public DtmController(ILogger<DtmController> logger, IDtmClient dtmClient,IDtmTransFactory transFactory, IOptions<AppSettings>settings, IBranchBarrierFactory factory)
  9. {
  10. _logger =logger;
  11. _dtmClient =dtmClient;
  12. _transFactory =transFactory;
  13. _settings =settings.Value;
  14. _factory =factory;
  15. }
  16. private DbConnection GetConn() => newMicrosoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
  17. [HttpPost("post-dtm-msg")]
  18. public async Task<IActionResult>Get(CancellationToken cancellationToken)
  19. {
  20. //1、创建gid
  21. var gid = await_dtmClient.GenGid(cancellationToken);
  22. //2、设置分支事务
  23. var msg =_transFactory.NewMsg(gid)
  24. .Add(_settings.BusiUrl + "/TransOut", new { id = 123})
  25. .Add(_settings.BusiUrl + "/TransIn", new { id = 321});//3、执行submit
  26. using (DbConnection conn =GetConn())
  27. {
  28. await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>{
  29. //4、执行本地事务
  30. awaitTask.CompletedTask;
  31. });
  32. }
  33. _logger.LogInformation("result gid is {0}", gid);
  34. return Content("SUCCESS");
  35. }
  36. [HttpGet("msg-queryprepared")]
  37. public async Task<IActionResult>QueryPrepared(CancellationToken cancellationToken)
  38. {
  39. var bb =_factory.CreateBranchBarrier(Request.Query);
  40. _logger.LogInformation("bb {0}", bb);
  41. using (DbConnection conn =GetConn())
  42. {
  43. //回调查询消息状态
  44. var res = awaitbb.QueryPrepared(conn);
  45. return Ok(new { dtm_result =res });
  46. }
  47. }
  48. }

然后我们向另一个服务项目添加如下代码,作为一个简单的服务方法,没有任何操作只是返回成功:

  1. [ApiController]
  2. public classTransController : ControllerBase
  3. {
  4. private readonly ILogger<TransController>_logger;
  5. private readonlyIBranchBarrierFactory _factory;
  6. private readonlyAppSettings _settings;
  7. private DbConnection GetConn() => newMicrosoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
  8. public TransController(ILogger<TransController> logger, IBranchBarrierFactory factory, IOptions<AppSettings>settings)
  9. {
  10. _logger =logger;
  11. _factory =factory;
  12. _settings =settings.Value;
  13. }
  14. [HttpPost("TransIn")]
  15. public async Task<IResult>In()
  16. {
  17. return Results.Ok(new { dtm_result = "SUCCESS"});
  18. //return Results.Ok(new { dtm_result = "FAILURE" });
  19. }
  20. [HttpPost("TransOut")]
  21. public async Task<IResult>Out()
  22. {
  23. return Results.Ok(new { dtm_result = "SUCCESS"});
  24. }
  25. }

五、执行查看结果

我们正常执行,可以看到下面的动图结果,在执行完本地事务后会访问分支事务,然后数据库表中添加了一条记录

可以在管理后台看到我们请求成功的信息

如果要演示失败,需要做以下修改直接报错,我们可以看到访问了回调方法,然后数据库中看到 rollback 标记的消息

  1. using (DbConnection conn =GetConn())
  2. {
  3. await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>{
  4. throw new Exception("报错了");
  5. //4、执行本地事务
  6. awaitTask.CompletedTask;
  7. });
  8. }

提交后再宕机演示比较麻烦,我就不演示了,大家意会即可。

如果分支事务返回的不是SUCCESS而是FAILURE会由DTM隔一段时间重新请求,dtm对每个事务的重试是指数退避策略,具体为间隔是每失败一次,间隔加倍,避免过多的重试,导致系统负载异常上升。

如果您经过长时间的的宕机,因指数退避算法导致要很久才会重试。如果您想要手动触发立即重试,您可以手动把相应事务的next_cron_time(Redis存储引擎的该功能还在开发中)修改为当前时间,就会在数秒内被定时轮询,事务就会继续往前执行。

相关文章

最新文章

更多