文章23 | 阅读 16343 | 点赞0
本文基于seata 1.3.0版本
ServerOnRequestProcessor是TC用于处理全局或者分支事务请求的处理器,它主要处理八种类型的请求:
因为涉及内容较多,本文只介绍全局事务开启和分支事务注册的处理逻辑,其他的内容在以后的文章说明。
本小节介绍的是在真正处理开始前请求的转发流程,处理每个请求时,转发流程都是一样的,后面在介绍请求处理时,将不再介绍转发内容。
在文章《Seata解析-seata核心类NettyRemotingServer详解》里面已经介绍了请求消息最终是转发到ServerOnRequestProcessor的onRequestMessage方法。下面再来看一下这个方法:
//代码有删减
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
//获取RpcContext上下文对象
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
//MergedWarpMessage表示当前的消息是一个复合消息,里面包含了多个消息,需要对每个消息进行遍历
if (message instanceof MergedWarpMessage) {
AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
for (int i = 0; i < results.length; i++) {
final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
//调用协调器DefaultCoordinator处理
results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results);
//异步发送响应消息
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
} else {
final AbstractMessage msg = (AbstractMessage) message;
//调用协调器DefaultCoordinator处理
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
//异步发送响应消息
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}
从onRequestMessage方法中可以看到,ServerOnRequestProcessor又将这八种类型的消息转发给了transactionMessageHandler。transactionMessageHandler其实在TC启动的时候,在Server类中设置的:
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
//在下面这行代码中设置transactionMessageHandler=coordinator
nettyRemotingServer.setHandler(coordinator);
从上面代码可以看到ServerOnRequestProcessor又将请求转发给了协调器DefaultCoordinator对象,下面是协调器的onRequest方法:
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
//设置处理器,设置每个请求对象的hadler属性为协调器对象
transactionRequest.setTCInboundHandler(this);
//调用请求对象的handle方法
return transactionRequest.handle(context);
}
在onRequest方法里面,又去调用了每个事务请求对象的handle方法,而每个请求对象的handle方法都是相同的:
public AbstractTransactionResponse handle(RpcContext rpcContext) {
//handler对象其实是协调器DefaultCoordinator,这里又去调用了协调器的handle方法
//DefaultCoordinator对handler方法做了重载,根据事务请求对象的不同调用的方法不同
return handler.handle(this, rpcContext);
}
TC的请求转发相当绕,我觉得这个代码结构需要优化,在协调器中直接调用对应的协调器的handle方法即可,没有必要经请求对象做一次转发,而且将处理方法写在请求对象中,也使得请求对象承担了更多的职责,违反了单一职责原理。
下面使用图来展示请求的对象的转发流程:
对于每个请求,最终是由协调器的handle方法处理,协调器提供了8个重载的handle方法,分别对应上面的八种请求。
全局事务开启请求的消息类型是MessageType.TYPE_GLOBAL_BEGIN,请求对象为GlobalBeginRequest,该请求由TM发出。全局事务是由@GlobalTransactional注解的方法开启的,也就是当执行@GlobalTransactional注解的方法时,TM先发送GlobalBeginRequest请求到TC来开启全局事务。
全局事务开启请求的处理,总的来说分为两部分内容:一是创建GlobalSession对象,该对象的创建意味着全局事务的开启,GlobalSession记录了应用名,XID,事务id,全局事务状态等内容,二是生成XID,并设置GlobalSession在事务状态为开启,将XID返回至TM。
下面来看具体实现,首先是协调器中处理该请求的handle方法:
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
//使用模板方法
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
e);
}
}
}, request, response);
return response;
}
public <T extends AbstractTransactionRequest, S extends AbstractTransactionResponse> void exceptionHandleTemplate(Callback<T, S> callback, T request, S response) {
try {
callback.execute(request, response);
callback.onSuccess(request, response);
} catch (TransactionException tex) {
LOGGER.error("Catch TransactionException while do RPC, request: {}", request, tex);
callback.onTransactionException(request, response, tex);
} catch (RuntimeException rex) {
LOGGER.error("Catch RuntimeException while do RPC, request: {}", request, rex);
callback.onException(request, response, rex);
}
}
在handle()方法里面使用了模板方法,这里我们只需要关注模板方法里面的execute的处理逻辑,其他的方法都是非常简单,不再做介绍。
我们看到execute又调用了doGlobalBegin方法:
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//这里的core是DefaultCore对象,DefaultCore的begin方法返回事务XID
//之后response对象携带XID返回给TM,TM收到XID后表示全局事务开启成功
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
下面是DefaultCore对象的begin方法:
//name:默认是应用发起全局事务的方法名,可以通过@GlobalTransactional(name="beginTransaction")修改该值
//applicationId:发起全局事务的应用名
//transactionServiceGroup:发起全局事务的事务分组名
//timeout:事务超时时间,默认300s,可以通过@GlobalTransactional(timeoutMills = 3000)设置
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//创建全局会话,在seata中事务称为会话
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
//设置监听器,监听器的作用主要是写事务日志,关于监听器后续单独文章介绍
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//开启事务
session.begin();
// transaction start event
// 发布全局事务开启事件,这里用到了Guava里面的EventBus工具类,这个后面单独文章介绍
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
//返回XID
return session.getXid();
}
上面代码用到了GlobalSession里面的两个方法:createGlobalSession和begin,下面分别来看一下:
public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName,
int timeout) {
//GlobalSession的构造方法在下面
GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);
return session;
}
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
//XID的组成=IP:port:transactionId
this.xid = XID.generateXID(transactionId);
}
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
//设置事务开启时间
this.beginTime = System.currentTimeMillis();
//事务为激活状态,直到GlobalSession对象关闭
this.active = true;
//调用监听器,后续单独文章分析
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
GlobalSession的这几个方法还是比较简单的,直接进行的赋值操作。
在GlobalSession的构造方法中可以看到,XID的组成:TC的IP:TC的监听端口:UUID,begin方法表示事务开启,可以看到事务开启是设置了会话的状态为GlobalStatus.Begin,并且设置了会话开始时间。
创建好GlobalSession对象,生成XID后,便将XID返回到TM,到此全局事务开启请求的处理结束。
分支事务注册请求的消息类型是MessageType.TYPE_BRANCH_REGISTER,请求对象为BranchRegisterRequest,该请求由RM发出。该请求用于开启分支事务,数据库的增删改就是在分支事务中进行的。
该请求的处理相对来说比较复杂一些,先用下面的图让大家有个了解:
下面介绍一下代码实现。
处理该请求也是在模板方法中进行的,在模板方法里面直接调用了DefaultCoordinator.doBranchRegister方法,下面直接来看doBranchRegister方法方法:
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
RpcContext rpcContext) throws TransactionException {
//分支事务注册,TC生成一个分支ID,并返回到RM
//下面要调用DefaultCore的branchRegister方法
response.setBranchId(
core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
request.getXid(), request.getApplicationData(), request.getLockKey()));
}
//下面是DefaultCore的branchRegister方法
/** * @param branchType 分支类型,我们现在使用的是AT模式,branchType为AT * @param resourceId RM连接数据库的URL * @param clientId 客户端ID,组成是应用名:IP:端口 * @param xid the xid * @param applicationData 未知 * @param lockKeys 需要锁定记录的主键值和表,这个非常关键。RM在操作数据库前会解析SQL语句 * 分析出SQL语句要操作的表和主键值,并将这两个值拼成字符串发送到TC */
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
//AT模式下branchType为AT,getCore方法返回的是ATCore对象
return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,
applicationData, lockKeys);
}
branchRegister方法调用了ATCore的branchRegister方法去注册分支事务:
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
//根据XID找到GlobalSession对象,这个对象是全局事务开启时创建的
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
//1、lockAndExecute方法中要对GlobalSession对象上锁
return SessionHolder.lockAndExecute(globalSession, () -> {
//检查事务状态是否为开启和激活状态
globalSessionStatusCheck(globalSession);
//添加监听器,后续专门文章解析
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//创建分支会话BranchSession对象
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
//2、对记录进行上锁
branchSessionLock(globalSession, branchSession);
try {
//设置分支事务状态为BranchStatus.Registered,并且将该分支事务添加到GlobalSession中
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(FailedToAddBranch, String
.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()), ex);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
}
//返回分支事务ID
return branchSession.getBranchId();
});
}
这里涉及到两个地方上锁,首先来看第一个SessionHolder.lockAndExecute:
public static <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
throws TransactionException {
//这里根会话管理器使用的是FileSessionManager,其lockAndExecute方法见下面
return getRootSessionManager().lockAndExecute(globalSession, lockCallable);
}
public <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
throws TransactionException {
//全局事务上锁,这个锁定的是事务自身,如果在全局事务中存在并发分支事务,这里会被拦截
globalSession.lock();
try {
//调用回调方法
return lockCallable.call();
} finally {
globalSession.unlock();
}
}
第一个加锁相对比较简单,就是对全局会话GlobalSession直接加锁,底层使用的是ReentrantLock,如果当前已经被加锁,最长锁等待时间是2000ms。代码如下:
private Lock globalSessionLock = new ReentrantLock();
private static final int GLOBAL_SESSION_LOCK_TIME_OUT_MILLS = 2 * 1000;
public void lock() throws TransactionException {
try {
if (globalSessionLock.tryLock(GLOBAL_SESSION_LOCK_TIME_OUT_MILLS, TimeUnit.MILLISECONDS)) {
return;
}
} catch (InterruptedException e) {
LOGGER.error("Interrupted error", e);
}
throw new GlobalTransactionException(TransactionExceptionCode.FailedLockGlobalTranscation, "Lock global session failed");
}
下面来看第二个加锁的地方,也就是方法branchSessionLock:
protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
//调用分支会话加锁,如果加锁失败,则抛出异常
if (!branchSession.lock()) {
throw new BranchTransactionException(LockKeyConflict, String
.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()));
}
}
//下面是BranchSession的lock方法,从这里可以看到首先获取锁管理器,然后使用锁管理器上锁
public boolean lock() throws TransactionException {
if (this.getBranchType().equals(BranchType.AT)) {
return LockerManagerFactory.getLockManager().acquireLock(this);
}
return true;
}
seata锁管理器与存储模式有关,存储模式可以是db、file、redis,这里使用的是file,对应的类是FileLockManager。下面是FileLockManager的acquireLock方法:
public boolean acquireLock(BranchSession branchSession) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
//得到本分支事务需要加锁的记录和表名,RM将要加锁的记录主键值和表名组装成字符串发送到TC
String lockKey = branchSession.getLockKey();
if (StringUtils.isNullOrEmpty(lockKey)) {
// no lock
return true;
}
// 创建RowLock集合,RowLock对象表示要加锁的表及主键中字段的值,一条加锁记录一个RowLock对象
List<RowLock> locks = collectRowLocks(branchSession);
if (CollectionUtils.isEmpty(locks)) {
// no lock
return true;
}
//当前设置存储模式是file,getLocker方法返回的是FileLocker
return getLocker(branchSession).acquireLock(locks);
}
protected List<RowLock> collectRowLocks(BranchSession branchSession) {
List<RowLock> locks = new ArrayList<>();
if (branchSession == null || StringUtils.isBlank(branchSession.getLockKey())) {
return locks;
}
String xid = branchSession.getXid();
//得到资源id,也就是数据库连接URL
String resourceId = branchSession.getResourceId();
//得到事务ID,分支事务的transactionId与GlobalSession的transactionId是相同的
long transactionId = branchSession.getTransactionId();
String lockKey = branchSession.getLockKey();
//遍历主键中的每个字段值,对每个字段值创建RowLock对象
return collectRowLocks(lockKey, resourceId, xid, transactionId, branchSession.getBranchId());
}
protected List<RowLock> collectRowLocks(String lockKey, String resourceId, String xid, Long transactionId,
Long branchID) {
List<RowLock> locks = new ArrayList<RowLock>();
//当要对多个记录加锁时,中间使用";"分隔
String[] tableGroupedLockKeys = lockKey.split(";");
for (String tableGroupedLockKey : tableGroupedLockKeys) {
//表名和记录主键值之间使用":"分隔
int idx = tableGroupedLockKey.indexOf(":");
if (idx < 0) {
return locks;
}
//tableName为要加锁的表名
String tableName = tableGroupedLockKey.substring(0, idx);
//mergedPKs为要加锁的记录主键值,如果主键有多个字段,则使用"_"分隔
//如果需要一次加锁多条记录,则记录之间使用","分隔
String mergedPKs = tableGroupedLockKey.substring(idx + 1);
if (StringUtils.isBlank(mergedPKs)) {
return locks;
}
String[] pks = mergedPKs.split(",");
if (pks == null || pks.length == 0) {
return locks;
}
//遍历主键中的每个字段,创建RowLock对象,每个主键值都使用字符串表示
for (String pk : pks) {
if (StringUtils.isNotBlank(pk)) {
RowLock rowLock = new RowLock();
rowLock.setXid(xid);
rowLock.setTransactionId(transactionId);
rowLock.setBranchId(branchID);
rowLock.setTableName(tableName);
rowLock.setPk(pk);
rowLock.setResourceId(resourceId);
locks.add(rowLock);
}
}
}
return locks;
}
上面的collectRowLocks方法主要是遍历需要加锁的记录,加锁的记录存储在BranchSession的lockKey属性中,如果需要对一个表的两条记录加锁,而且该表主键有两个字段,则lockKey的组成是:
表名:主键值_主键值;表名:主键值_主键值
collectRowLocks方法遍历lockKey的每条记录,并且一条记录生成一个RowLock对象。然后调用FileLocker.acquireLock方法对RowLock对象加锁。
acquireLock方法遍历所有的RowLock对象,对记录加锁时,首先查看当前记录是否已经加锁,如果没有加过,则直接记录主键值和事务id,表示对该记录上锁,如果已经加过,则比较本次加锁的事务id与上次的事务id是否一致,如果一致,则直接跳过,如果不一致,意味着加锁失败,接下来会对该分支事务已经加过的锁解锁,然后返回RM加锁失败。
acquireLock里面创建BucketLockMap对象记录了加锁的主键值和加锁的事务id的对应关系,表示该记录是由该事务加锁,BucketLockMap可以简单认为是Map对象,然后BucketLockMap会被存储到两个地方:BranchSession的lockHolder属性和LOCK_MAP。
lockHolder属性是一个Map,key是BucketLockMap对象,value是一个集合,存储了数据库的主键值,凡是记录到该集合中就表示本分支事务对这些记录加了锁,释放锁的时候就根据value集合从BucketLockMap对象中删除记录。
下面重点介绍LOCK_MAP。
LOCK_MAP是一个全局的锁集合,记录了目前所有事务正在加的锁,判断记录是否已经加锁以及加锁的事务id就是通过LOCK_MAP完成的,它的类型如下:
ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, BucketLockMap>>>
结构是:数据库资源->表名->桶->BucketLockMap。
前面两个比较好理解,BucketLockMap也已经介绍过。
LOCK_MAP将每个数据库表下面分了128个桶,每个桶对应一个BucketLockMap,一条加锁记录分在哪个桶,是根据主键值的hash值模128得到的,如果分到某个桶下,就将主键值和事务id添加到桶对应的BucketLockMap中,所以BucketLockMap里面其实存储了不同事务的加锁记录,下面来看一下具体实现:
public boolean acquireLock(List<RowLock> rowLocks) {
//如果集合为空,表示没有要加锁的记录
if (CollectionUtils.isEmpty(rowLocks)) {
//no lock
return true;
}
String resourceId = branchSession.getResourceId();
long transactionId = branchSession.getTransactionId();
//bucketHolder:BucketLockMap->主键值集合,表示当前分支事务有哪些记录已经加锁了
//在branchSession中记录lockHolder是为了在释放锁的时候使用
//BucketLockMap是一个Map对象,是记录主键值与transactionId的对应关系
ConcurrentMap<BucketLockMap, Set<String>> bucketHolder = branchSession.getLockHolder();
//dbLockMap:表名->桶->BucketLockMap,
//LOCK_MAP:数据库资源->表名->桶->BucketLockMap,
//LOCK_MAP记录了全局所有事务的所有加锁记录
ConcurrentMap<String, ConcurrentMap<Integer, BucketLockMap>> dbLockMap = LOCK_MAP.get(resourceId);
if (dbLockMap == null) {
LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<>());
dbLockMap = LOCK_MAP.get(resourceId);
}
//遍历每个需要加锁的记录
for (RowLock lock : rowLocks) {
String tableName = lock.getTableName();
//主键值
String pk = lock.getPk();
ConcurrentMap<Integer, BucketLockMap> tableLockMap = dbLockMap.get(tableName);
if (tableLockMap == null) {
dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<>());
tableLockMap = dbLockMap.get(tableName);
}
//得到桶的值,桶的个数是128个
int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
BucketLockMap bucketLockMap = tableLockMap.get(bucketId);
if (bucketLockMap == null) {
tableLockMap.putIfAbsent(bucketId, new BucketLockMap());
bucketLockMap = tableLockMap.get(bucketId);
}
//previousLockTransactionId为之前的事务id
Long previousLockTransactionId = bucketLockMap.get().putIfAbsent(pk, transactionId);
//下面分为三种情况,
// 一是当前加锁的事务与之前已经加过锁的事务是同一个,
// 二是两个事务不是同一个,
// 三是之前还没有加过锁
if (previousLockTransactionId == null) {
//No existing lock, and now locked by myself
Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
if (keysInHolder == null) {
bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<>());
keysInHolder = bucketHolder.get(bucketLockMap);
}
//如果该记录没有加过锁,则直接将记录保存到集合中
keysInHolder.add(pk);
} else if (previousLockTransactionId == transactionId) {
//如果加锁的事务是同一个,则跳过加锁
// Locked by me before
continue;
} else {
LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + previousLockTransactionId);
try {
//如果要加锁的记录已经被其他事务加过锁,则释放当前分支事务已经加过的锁,并返回加锁失败
// Release all acquired locks.
branchSession.unlock();
} catch (TransactionException e) {
throw new FrameworkException(e);
}
return false;
}
}
return true;
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38308374/article/details/108457173
内容来源于网络,如有侵权,请联系作者删除!