- I have searched the issues of this repository and believe that this is not a duplicate.
Ⅰ. Issue Description
2.0.0 /ext/apm-seata-skywalking-plugin在实现interceptor时,入参类型转换异常
Ⅱ. Describe what happened
If there is an exception, please attach the exception trace:
ERROR 2024-06-13 14:02:26.871 NettyServerNIOWorker_1_2_2 InstMethodsInter : class[class io.seata.core.rpc.processor.server.ServerHeartbeatProcessor] after method[process] intercept failure
java.lang.ClassCastException: io.netty.channel.DefaultChannelHandlerContext cannot be cast to io.seata.core.protocol.RpcMessage
at io.seata.apm.skywalking.plugin.RemotingProcessorProcessInterceptor.afterMethod(RemotingProcessorProcessInterceptor.java:70)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:97)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.process(ServerHeartbeatProcessor.java)
at io.seata.core.rpc.netty.AbstractNettyRemoting.processMessage(AbstractNettyRemoting.java:306)
at io.seata.core.rpc.netty.AbstractNettyRemotingServer$ServerHandler.channelRead(AbstractNettyRemotingServer.java:169)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
ERROR 2024-06-13 14:02:26.871 NettyServerNIOWorker_1_2_2 InstMethodsInter : class[class io.seata.core.rpc.netty.NettyRemotingServer] after method[sendAsync] intercept failure
java.lang.ClassCastException: io.netty.channel.socket.nio.NioSocketChannel cannot be cast to io.seata.core.protocol.RpcMessage
at io.seata.apm.skywalking.plugin.NettyRemotingClientSendSyncInterceptor.afterMethod(NettyRemotingClientSendSyncInterceptor.java:71)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:97)
at io.seata.core.rpc.netty.AbstractNettyRemoting.sendAsync(AbstractNettyRemoting.java)
at io.seata.core.rpc.netty.AbstractNettyRemotingServer.sendAsyncResponse(AbstractNettyRemotingServer.java:104)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.$sw$original$process$hr32d22(ServerHeartbeatProcessor.java:48)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.$sw$original$process$hr32d22$accessor$$sw$aeun762(ServerHeartbeatProcessor.java)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor$$sw$auxiliary$in3hfi1.call(Unknown Source)
还有【beforeMethod】方法类型转换异常
Ⅲ. Describe what you expected to happen
主要的异常产生位置为:
DefaultCoreDoGlobalCommitInterceptor、NettyRemotingClientSendSyncInterceptor、RemotingProcessorProcessInterceptor和SWSeataUtils
DefaultCoreDoGlobalCommitInterceptor类中:
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
RpcMessage rpcMessage = null;
for(Object arg:allArguments) {
if (arg instanceof RpcMessage) {
rpcMessage = (RpcMessage) arg;
if (!(rpcMessage.getBody() instanceof AbstractMessage)) {
return;
}
}
}
/* 异常产生位置
RpcMessage rpcMessage = (RpcMessage) allArguments[0];
if (!(rpcMessage.getBody() instanceof AbstractMessage)) {
return;
}*/
。。。
@OverRide
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
for(Object arg:allArguments){
if(arg instanceof RpcMessage){
RpcMessage rpcMessage = (RpcMessage) arg;
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}
}
}
/* 异常产生位置
RpcMessage rpcMessage = (RpcMessage) allArguments[0];
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}*/
return ret;
}
NettyRemotingClientSendSyncInterceptor类中:
@OverRide
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
for(Object arg:allArguments){
if(arg instanceof RpcMessage){
RpcMessage rpcMessage = (RpcMessage) arg;
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}
}
}
/*RpcMessage rpcMessage = (RpcMessage) allArguments[0];
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}*/
return ret;
}
RemotingProcessorProcessInterceptor类中:
@OverRide
public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
for(Object arg:allArguments){
if(arg instanceof RpcMessage){
RpcMessage rpcMessage = (RpcMessage) arg;
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}
}
}
/*RpcMessage rpcMessage = (RpcMessage) allArguments[0];
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}*/
return ret;
}
SWSeataUtils类中:
public static String convertXid(RpcMessage rpcMessage) {
String xid = null;
// 调整后
if(rpcMessage != null && rpcMessage.getBody() != null){
if(rpcMessage.getBody() instanceof AbstractMessage){
AbstractMessage subMessage = (AbstractMessage) rpcMessage.getBody();
String requestSimpleName = rpcMessage.getBody().getClass().getSimpleName();
try {
xid = SWSeataConstants.TRANSACTION_TRANSMISSION_CLASS_NAME_MAPPING.get(requestSimpleName) != null
? (String) SWSeataConstants.TRANSACTION_TRANSMISSION_CLASS_NAME_MAPPING.get(requestSimpleName)
.getDeclaredMethod("getXid").invoke(subMessage)
: xid;
} catch (Throwable e) {
LOGGER.error("convert seata xid failure", e);
}
}
}
return xid;
}
以上是测试通过后的代码调整,仅供参考
Ⅵ. Environment:
- JDK version(e.g.
java -version
): jdk 8 - Seata client/server version: 2.0.0
- Database version:
- OS(e.g.
uname -a
): - Others: skywalking-java-agent 9.0.0-alpine
1条答案
按热度按时间1szpjjfi1#
you can Consider providing a PR for the SkyWalking seata plugin