通过底层技术实现 RMI

x33g5p2x  于2022-05-16 转载在 其他  
字(6.6k)|赞(0)|评价(0)|浏览(226)

一 点睛

RMI:远程方法调用(Remote Method Invocation),只适用于 Java 语言。也就是说,各台计算上使用的语言都必须是 Java。

实现机制:在本地(客户端),直接操作的是服务端接口在本地的副本(称为stub),通过stub来充当服务端的接口。例如,如果客户端 L 对象药调用服务端接口 R 中 method() 方法,那么是无法在客户端直接写成L.method()。因此,就必须先将服务端的 method() 的接口 R 在本地复制一份( 即stub),然后才能写成L.method()。可以发现,在 RMI 通信时,客户端接口借助的是 stub,而服务端为了便于组织管理服务端商的各个接口,以及为了便于和客户端通信,服务端也设置了一个辅助对象 skeleton。即 RMI 通信过程,实际是客户端 stub 和 服务端的 skeleton 在直接交互,如下图所示。

二 具体流程

1 客户端与服务端之间通过 Socket 交互信息。

a 客户端先以字符串的形式定义好需要请求的接口名("remote.procedure.call.server.RMIService",)然后再将此字符串通过反射,解析出请求的接口名、方法名、方法参数等信息,并将这些信息通过对象流 ObjectOutputStream 发送给服务端。

b 服务端逐个解析这些信息,并通过反射中 invoke() 方法调用服务端上被请求的方法。

2 因为服务端可能存在多个提供方法的接口,因此服务端需要一个“服务注册中心”来统一管理这些接口;当客户端请求某一个接口时,“服务注册中心”就可以立刻获取并提供那个接口。其中,“服务注册中心”可以是一个 Map,其中 Key 存放着接口的名字,value 就是响应方法的接口对象。

3 “服务注册中心”根据客户端请求,找到相应接口后(通过map.get(接口名)),通过该接口的实现类提供方法(即执行实现类中的方法)。

4 服务完毕,服务端再将返回值通过对象流传给客户端。

5 因为不同的服务会返回不同的数据类型给客户端,因此客户端需要通过动态代理来接收服务端的返回值。

三 流程图

四 代码

1 提供服务接口

package rmi.server;

// 服务端上,提供服务的接口
public interface RMIService {
    String sayHi(String name);
}

2 提供服务接口的实现类

package rmi.server;

// 服务端上,提供服务接口的实现类
public class RMIServiceImpl implements RMIService {
    @Override
    public String sayHi(String name) {
        return "hi," + name;
    }
}

3 服务注册中心接口

package rmi.server;

/**
 * @className: ServerCenter
 * @description: 服务注册中心接口
 * @date: 2022/5/14
 * @author: cakin
 */
public interface ServerCenter {
    // 启动服务
    void start();

    // 关闭服务
    void stop();

    // 注册服务
    void register(Class service, Class serviceImpl);
}

4 服务注册中心实现类

package rmi.server;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 服务注册中心实现类
public class ServerCenterImpl implements ServerCenter {
    // map:服务端的所有可供客户端访问的接口,都注册到该 map 中
    // key: 接口的名字(如"RMIService"),value:真正的提供服务的类(如RMIServiceImpl类)
    private static HashMap<String, Class> serviceRegiser = new HashMap<>();
    // 服务端的端口号
    private static int port;
    // 线程池:线程池中存在多个线程对象,每个线程对象都可以处理一个客户请求
    private static ExecutorService executor
            = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    // 是否开启服务
    private static volatile boolean isRunning = false;

    public ServerCenterImpl(int port) {
        this.port = port;
    }

    // 开启服务端服务
    @Override
    public void start() {
        ServerSocket server = null;
        try {
            server = new ServerSocket();
            server.bind(new InetSocketAddress(port));
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        isRunning = true;
        // 客户端每次请求一次请求,服务端从线程池中启动一个线程对象去处理
        while (true) {
            // 具体的服务内容:接收客户端请求,处理请求,并返回结果
            System.out.println("服务已启动...");
            Socket socket = null;
            try {
                socket = server.accept(); // 等待客户端连接
            } catch (IOException e) {
                e.printStackTrace();
            }
            // 启动一个线程 去处理客户请求
            executor.execute(new ServiceTask(socket));
        }
    }

    // 关闭服务
    @Override
    public void stop() {
        isRunning = false;
        executor.shutdown();
    }

    // 将接口名和接口实现类一一对应,以便于接收到请求时,能及时获取到对应的服务实现类
    @Override
    public void register(Class service, Class serviceImpl) {
        serviceRegiser.put(service.getName(), serviceImpl);
    }

    // 处理请求的线程
    private static class ServiceTask implements Runnable {
        private Socket socket;

        public ServiceTask() {
        }

        public ServiceTask(Socket socket) {
            this.socket = socket;
        }

        // 具体的处理逻辑
        @Override
        public void run() {
            ObjectOutputStream output = null;
            ObjectInputStream input = null;
            try {
                // 接收到客户端的各个请求参数(接口名、方法名、参数类型、参数值)
                input = new ObjectInputStream(socket.getInputStream());
                // 因为 ObjectInputStream 对发送数据的顺序有严格要求,因此必须按照发送的顺序逐个接收
                // 请求的接口名
                String serviceName = input.readUTF();
                // 请求的方法名
                String methodName = input.readUTF();
                // 请求方法的参数类型
                Class[] parameterTypes = (Class[]) input.readObject();
                // 请求方法的参数名
                Object[] arguments = (Object[]) input.readObject();
                // 根据客户请求,到服务注册中心 map 中找到与之对应的具体接口(即RMIService)
                Class ServiceClass = serviceRegiser.get(serviceName);
                // 构建请求的方法
                Method method = ServiceClass.getMethod(methodName, parameterTypes);
                // 执行该方法
                Object result = method.invoke(ServiceClass.newInstance(), arguments);
                // 将执行完毕的返回值,返回给客户端
                output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (output != null)
                        output.close();
                    if (input != null)
                        input.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

5 客户端

package rmi.client;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

public class RMIClient {
    /**
     * 功能描述:获取代表服务端接口的动态代理对象(RMIService)
     *
     * @param serviceInterface:请求的接口名
     * @param addr:待请求服务端的ip:端口
     * @author cakin
     * @date 2022/5/14
     */
    @SuppressWarnings("unchecked")
    public static <T> T getRemoteProxyObj(Class serviceInterface,
                                          InetSocketAddress addr) {
		/*
		   newProxyInstance(a,b,c)中,前两个参数的含义如下:
		   	a:类加载器 :需要代理那个对象的类加载器
		    b:用于表示需要代理的对象提供了哪些方法。Java是单继承、多实现,
		    	因此,如果某一个对象实现了多个接口,那么该对象就拥有其全部接口的所有方法,因此是一个接口数组。
		        例如,如果有A implements B接口,c接口,并且B接口中有3个方法,C接口中有2个方法,那么A的对象就拥有5个方法(暂不考虑A自身提供的方法)
		 */
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
                new Class<?>[]{serviceInterface}, new InvocationHandler() {
                    /**
                     * 功能描述:
                     *
                     * @author cakin
                     * @date 2022/5/14
                     * @param proxy:代理的对象
                     * @param method:哪个方法(sayHi(参数列表))
                     * @param args:参数列表
                     * @return T
                     */
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) {
                        // 客户端向服务端发送请求:请求某一个具体的接口
                        Socket socket = new Socket();
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
                        try {
                            // addr包含了要访问的服务端的Ip和端口
                            socket.connect(addr);
                            // 通过序列化流(对象流)向服务端发送请求
                            output = new ObjectOutputStream(socket.getOutputStream());
                            // 发送请求的接口名
                            output.writeUTF(serviceInterface.getName());
                            // 发送请求的方法名
                            output.writeUTF(method.getName());
                            // 发送请求的方法的参数的类型
                            output.writeObject(method.getParameterTypes());
                            // 发送请求的方法的参数值
                            output.writeObject(args);
                            // 等待服务端处理...
                            // 接收服务端处理后的返回值
                            input = new ObjectInputStream(socket.getInputStream());
                            return input.readObject();
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        } finally {
                            try {
                                if (output != null) output.close();
                                if (input != null) input.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
    }
}

6 服务端启动类

package rmi.test;

import rmi.server.RMIService;
import rmi.server.RMIServiceImpl;
import rmi.server.ServerCenter;
import rmi.server.ServerCenterImpl;

public class TestRMIServer {
    public static void main(String[] args) {
        // 用线程的形式启动服务
        new Thread(() -> {
            // 服务中心
            ServerCenter server = new ServerCenterImpl(9999);
            // 将 RMIService 接口及实现类,注册到服务中心
            server.register(RMIService.class, RMIServiceImpl.class);
            server.start();
        }).start();
    }
}

7 客户端启动类

package rmi.test;
import rmi.client.RMIClient;
import rmi.server.RMIService;
import java.net.InetSocketAddress;
// 先启动服务端,再启动客户端,就能看到客户端成功的调用了服务端上的 sayHi() 方法。
public class TestRMIClient {
    public static void main(String[] args) throws ClassNotFoundException {
        // 调用远程的 rmi.server.RMIService 接口,并执行接口中的 sayHi() 方法
        RMIService service = RMIClient.getRemoteProxyObj(
                Class.forName("rmi.server.RMIService" ) ,
                new InetSocketAddress("127.0.0.1", 9999)) ;
        System.out.println( service.sayHi("zs")  ) ;
    }
}

五 测试结果

hi,zs

相关文章