场景描述
RM二阶段发生异常时,TC是会进行不断的重试。直到成功为止
那么,问题就来了。一直重试虽然在一定程度上可能成功,但大部分情况下一直重试是不会成功的
这个时候,是需要作出一定的告警措施,比如发邮件,发钉钉之类的
比较幸运的是,seata提供了SPI,这样可以比较方便的实现。经过一番研究尝试,终于找到比较好的实现方式。如下,enjoy!
Step One
注入自己实现的SPI
新建文件: src/main/resources/META-INF/services/io.seata.core.model.ResourceManager
文件内容如下。注意,是实现类的全路径来的,不懂SPI机制可先自行谷歌
com.xxx.MyTCCResourceManager
Step Two
com.xxx.MyTCCResourceManager类代码。大部分抄官方的io.seata.rm.tcc.TCCResourceManager代码即可,这里只列出关键代码
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
ret = commitMethod.invoke(targetTCCBean, args);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
if (!result) {
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method rollbackMethod = tccResource.getRollbackMethod();
if (targetTCCBean == null || rollbackMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
args, tccResource.getActionName());
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
ret = rollbackMethod.invoke(targetTCCBean, args);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
if (!result) {
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
}
return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
5条答案
按热度按时间9bfwbjaz1#
分享另外一个。
在做这个告警的时候,一开始是想着用aop LocalTCC这个注解来实现的,但用了aop之后,会导致confirm和cancel拿不到全局事务id的!所以这种方法是不可取的。
反正也写了一些代码,就分享一下,记住,以下代码会导致问题的!!!
切面类
配置类
ctzwtxfj2#
建议放到 Discussions 去,或者给官网 https://github.com/seata/seata.github.io 提交一篇博客pr
sd2nnvve3#
还有个问题我想讨论下,作为用户视角,你觉得发出告警的是client,还是server呢?比如server下发给client,其执行一直失败,这个其实server端也能感知到,比如在prometheus打个点,是否也能做成告警?
但是我总觉得client去处理的话自定义打点肯定比server要细致,更容易也更精确知道一些问题
oyjwcjzk4#
还有个问题我想讨论下,作为用户视角,你觉得发出告警的是client,还是server呢?比如server下发给client,其执行一直失败,这个其实server端也能感知到,比如在prometheus打个点,是否也能做成告警? 但是我总觉得client去处理的话自定义打点肯定比server要细致,更容易也更精确知道一些问题
感谢回复。我昨晚看完你的回复之后,想了一夜,觉得你说的比较符合实情,放在server用prometheus做告警这样会比较综合处理一些,这样会更省事。而放client的话,虽然说是说能更细致,但复杂度多很多 @a364176773
9vw9lbht5#
@a364176773
我看了下server-1.5.2的metrics。如果做监控的话,需要为每个接入的分布式事务做才行,没有统一的指标。如下图所示