网络编程 NIO

x33g5p2x  于2021-12-06 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(499)

NIO的介绍

一个复用器上是可以注册多个用户的连接,有效的用户请求才会来通知用户程序,用户程序才进行介入处理:

复用器(Selector)也叫做多路选择器,作用是检查一个或者多个NIO的channel(Channel),可以实现单线程管理多个channel,也可以管理多个网络请求.

channel:通道

用户IO操作的连接,对原有的IO补充,不能直接访问数据需要和缓冲区Buffer结合使用,通道主要有:

  • SocketChannel:主要是用户连接服务端,一般在客户端实现
  • ServerSocketChannel:监听新进来的TCP连接,对于每一个新用户连接创建一个SocketChannel实例,一般在服务端实现

Buffer:缓冲区

IO流中的数据需要经过缓冲区交给channel

NIO编码

服务端编程

  1. public class Server {
  2.    public static void main(String[] args) {
  3.        ServerSocketChannel serverSocketChannel = null;
  4.        try {
  5.            //创建ServerSocketchannel实例
  6.            serverSocketChannel = ServerSocketChannel.open();
  7.            //绑定端口
  8.            serverSocketChannel.bind(new InetSocketAddress(7777));
  9.            System.out.println("服务端启动啦");
  10.            //设置ServerSocketchannel为非阻塞 true:表示则塞 false:非阻塞
  11.            serverSocketChannel.configureBlocking(false);
  12.            //创建复用器selector实例
  13.            Selector selector = Selector.open();
  14.            //将serverSocketChannel注册到selector中,并关注的是可接受事件
  15.            /**
  16.             * register(Selector sel, int ops)
  17.             * 第一个参数:复用器实例
  18.             * 第二个参数:表述关注的事件
  19.             * 存在4种事件
  20.             * 1、可读事件 read
  21.             * 2、可写事件 write
  22.             * 3、可接收事件 accept
  23.             * 4、可连接事件 connect
  24.             *
  25.             */
  26.            serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT | SelectionKey.OP_WRITE);
  27.            //selector阻塞等待事件发生
  28.            while (selector.select() >= 0) {
  29.                //获取已发生事件集合
  30.                Iterator <SelectionKey> iterator = selector.selectedKeys().iterator();
  31.                while (iterator.hasNext()) {
  32.                    SelectionKey selectionKey = iterator.next();
  33.                    //删除已完成事件
  34.                    iterator.remove();
  35.                    if (selectionKey.isValid() && selectionKey.isAcceptable()) {
  36.                        System.out.println("有接收事件发生");
  37.                        //有可接受事件发生
  38.                        //SelectableChannel 所有通道的父类,ServerSocketChannel、SocketChannel
  39.                        ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();
  40.                        //通过调用accept获取SocketChannel实例
  41.                        SocketChannel socketChannel = serverSocketChannel1.accept();
  42.                        System.out.println("有新用户连接啦"+socketChannel.getRemoteAddress());
  43.                        //设置为非阻塞
  44.                        socketChannel.configureBlocking(false);
  45.                        //将socketChannel注册到selector复用器上,并关注读事件
  46.                        socketChannel.register(selector, SelectionKey.OP_READ);
  47.                   }
  48.                    if (selectionKey.isValid() && selectionKey.isReadable()) {
  49.                        System.out.println("有可读事件发生");
  50.                        //有可读事件发生
  51.                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
  52.                        //读取数据
  53.                        //创建一个缓存实例
  54.                        ByteBuffer buffer = ByteBuffer.allocate(100);
  55.                        //将数据从channel写入缓存
  56.                        int read = socketChannel.read(buffer);
  57.                        //读写模式切换
  58.                        buffer.flip();
  59.                        //创建有效大小的byte数组
  60.                        byte[] bytes = new byte[read];
  61.                        buffer.get(bytes);
  62.                        String msg = new String(bytes);
  63.                        System.out.println("数据为:"+msg);
  64.                   }
  65.               }
  66.           }
  67.       } catch (IOException e) {
  68.            e.printStackTrace();
  69.       } finally {
  70.            //关闭资源
  71.            System.out.println("服务端关闭啦");
  72.            if (serverSocketChannel != null) {
  73.                try {
  74.                    serverSocketChannel.close();
  75.               } catch (IOException e) {
  76.                    e.printStackTrace();
  77.               }
  78.           }
  79.       }
  80.   }
  81. }

客户端编码流程

  1. public class Client {
  2.    public static void main(String[] args) {
  3.        SocketChannel socketChannel = null;
  4.        try {
  5.            //创建socketChannel实例
  6.            socketChannel = SocketChannel.open();
  7.            //将socketChannel设置为非阻塞
  8.            socketChannel.configureBlocking(false);
  9.            //创建Selector复用器实例
  10.            Selector selector = Selector.open();
  11.            //将socketChannel注册到Selector实例中,并关注可连接事件
  12.            socketChannel.register(selector,SelectionKey.OP_CONNECT);
  13.            //主动连接服务端,该方法不会阻塞,会立即返回结果,要么成功,要么未连接成功,则交由os等待连接过程
  14.            if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 7777))) {
  15.                //返回为false,表示连接还未连接成功
  16.                System.out.println("connect还未完成!");
  17.                //等待系统返回就绪事件
  18.                selector.select();
  19.                //进行完成连接操作
  20.                socketChannel.finishConnect();
  21.           }
  22.            System.out.println("客户端连接服务端成功");
  23.            //进行读写事件
  24.            byte[] bytes = "Hello Tulun".getBytes();
  25.            //创建buffer
  26.            ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
  27.            //往Buffer写数据
  28.            buffer.put(bytes);
  29.            //读写模式切换
  30.            buffer.flip();
  31.            //往socketchannel通道中写数据
  32.            socketChannel.write(buffer);
  33.       } catch (IOException e) {
  34.            e.printStackTrace();
  35.       } finally {
  36.            //关闭资源
  37.            if (socketChannel != null) {
  38.                System.out.println("客户端关闭啦");
  39.                try {
  40.                    socketChannel.close();
  41.               } catch (IOException e) {
  42.                    e.printStackTrace();
  43.               }
  44.           }
  45.       }
  46.   }
  47. }

练习:

通过NIO实现Echo命令,要求:客户端可多次进行消息发送。

问题:

问题1:客户端为什么主动connect连接?

在BIO中connect操作是可阻塞的方法,在NIO中设置为非阻塞,交给复用器来监听事件是否完成(connect可连接事件),内核帮助监听事件是否完成,必须先触发事件,触发之后内核才能监听事件是否完成,客户端来主动连接服务端,在NIO中connect是非阻塞,当前connect操作立即放回(true:连接已完成 false:当前事件已经触发了,但连接还没完成,就需要等待内核来关注事件是否完成)。

问题2:为什么客户端断开连接服务端一致循环接收到可读事件为空?

客户端断开连接,服务端会接收到-1,占用空间,服务端认为有数据接收,就会一直认为有可读事件服务端需要处理。

判断通道接收为-1表示结束接收

问题3:为什么写操作没有注册到复用器?

写操作在NIO中也是一个事件,但是写事件需要主动发起写操作,一般写完会立即write操作不会进行阻塞,即通常写操作不需要注册。

NIO重要组件

channel:通道

channel示例:

channel主要实现类

  • SocketChannel:通过TCP来读写网络中的数据,一般是客户端的实现
  • ServerSocketChannel:监听新进来的TCP连接,对每一个新连接创建一个SocketChannel,一般是服务端的实现
  • DatagramChannel:通过UDP读写网络中的数据通道
  • FileChannel:用于读取、写入等操作文件的通道

FileChanle使用示例:

  1.  RandomAccessFile randomAccessFile = new RandomAccessFile("/Users/gongdezhe/Desktop/download/test", "rw");
  2.        //可以进行读写操作
  3.        FileChannel fileChannel = randomAccessFile.getChannel();
  4.        //写数据
  5.        byte[] bytes = "hello".getBytes();
  6.        //将数据交给buffer
  7.        ByteBuffer byteBuffer = ByteBuffer.allocate(40);
  8.        byteBuffer.put(bytes); //将数据写入到buffer
  9.        byteBuffer.flip();
  10.        //写入通道
  11.        fileChannel.write(byteBuffer);
  12.        //关闭通道
  13.        fileChannel.close();

Buffer:缓存

Java中的NIO的Buffer用与和Channel交互。Java堆NIO中的内存缓存包装成BUffer对象,提供了一系列方法,方便开始使用。

基本使用:

以读数据为例:

  1. 从channel将数据写入buffer
  2. 调用buffer.flip()进行读写切换
  3. 从buffer中读取数据
  4. 对 buffer清空clear()和compact()

Buffer 实现

Buffer底层是通过特定类型(long,double...)的数组存储数据,借助4个指针来操作的。

  1. // Invariants: mark <= position <= limit <= capacity
  2. private int mark = -1;
  3. private int position = 0;
  4. private int limit;
  5. private int capacity;

**capacity:**buffer是有固定大小的,即capacity表示底层数组的大小

**position:**取决于读写模式

**写操作:**将数据写入buffer,每写入数据position会移动一位,初始值为0,最大值为capacity

**读操作:**会从某个特定位置读,当读写模式切换后,position会被重置为0,读取数据时,position移动表示下一个读取的位置

**limit:**写模式下,表示最多王buffer中写入的数据,最大写入的位置capacity,此时limit=capacity;读模式下,limit表示最多能读取到的数据。

**mark:**读写过程中指针变化

Buffer 类型:

buffer缓冲区的分配:

Buffer分配空间分两种,一种是在堆上开辟的空间,一种是在堆外开辟的空间(零拷贝技术)

Buffer对象的创建:

以ByteBuffer为例介绍:

ByteBuffer allocate(int capacity):在堆上创建指定大小的缓冲区

ByteBuffer wrap(byte[] array):通过byte数组创建一个缓冲区

ByteBuffer wrap(byte[] array,int offset, int length),截取bytebuffer部分内容创建缓冲区

ByteBuffer allocateDirect(int capacity) 在堆外内存创建一个指定大小的缓冲区

Buffer中的方法:

向buffer中写数据:

1、从channel写到buffer:channel.read(buffer);

2、通过buffer的put方法:buffer.put()

flip()读写模式切换:通过flip操作将limit置为position,然后将position置为0

从buffer中读数据:

有两种方式:

1、从buffer中读取到channel:channel.write(buffer)

2、通过buffer的get操作处理:buffer.get()

selector:复用器

Selector的使用:

1.创建selector的实例:Selector selector = Selector.open();

2.将通道注册到复用器上

  1. //将socketChannel设置为非阻塞
  2. socketChannel.configureBlocking(false);
  3. //将socketChannel注册到Selector实例中,并关注可连接事件
  4. socketChannel.register(selector,SelectionKey.OP_CONNECT*);

注册的channel是非阻塞的,SelectableChannel抽象类中提供了configureBlocking设置当前的通道是阻塞的还是非阻塞的。

3.使用复用器来监听事件是否完成

  1. //等待系统返回就绪事件
  2. selector.select();

select方法是会阻塞的,直至内核监听到注册的感兴趣事件发生才返回。

4.遍历感兴趣事件集合

  1. selector.selectedKeys().iterator()

通过selectedKeys返回的是SelectionKey的set集合,存放的是感兴趣事件的已准备就绪的。

5.如果还有关注的事件,就跳转至第3步继续循环等待

6.最终关闭复用器selector

SelectableChannel抽象类

SelectableChannel抽象类称之为可选择的通道,SelectableChannel是具有阻塞的和非阻塞的两种模式,在非阻塞模式下,永远不会阻塞IO操作,将会使用selector作为异步支持,即read和write操作都不会阻塞,可能立即返回,新创建的SelectableChannel总是处于阻塞的,需要手动调换用configureBlocking设置为false,如果选用selector多路复用器必须是channel为非阻塞的,将channel设置为非阻塞之后,其是不能改变,直至将SelectionKey注销掉。

SocketChannel、ServerSocketChannel、DatagramChannel都是SelectableChannel的子类,都是可以来设置为非阻塞的。FileChannel是不能设置非阻塞的,其不是SelectableChannel子类实现。

SelectionKey介绍

SelectionKey表示的是一个特定的通道对象和一个特定的复用器对象直接的注册关系。SelectionKey中对4个事件用4个常量表示:

  1. public static final int OP_READ = 1 << 0;
  2. public static final int OP_WRITE = 1 << 2;
  3. public static final int OP_CONNECT = 1 << 3;
  4. public static final int OP_ACCEPT = 1 << 4;

SelectionKey中提供的方法:

  1. public abstract SelectableChannel channel();
  2. //返回该SelectionKey对应通道
  3. public abstract Selector selector();
  4. //返回该SelectionKey注册的选择器
  5. public abstract boolean isValid();
  6. //判断该SelectionKey是否有效
  7. public abstract void cancel();
  8. //撤销该SelectionKey
  9. public abstract int interestOps();
  10. //返回SelectionKey的关注操作符
  11. //设置该SelectionKey的关注键,返回更改后新的SelectionKey
  12. public abstract SelectionKey interestOps(int ops);
  13. public abstract int readyOps();
  14. //返回SelectionKey的预备操作符
  15. //这里readyOps()方法返回的是该SelectionKey的预备操作符
  16. //判断该SelectionKey的预备操作符是否是OP_READ
  17. public final boolean isReadable() {
  18.    return (readyOps() & OP_READ) != 0;
  19. }
  20. //判断该SelectionKey的预备操作符是否是OP_WRITE
  21. public final boolean isWritable() {
  22.    return (readyOps() & OP_WRITE) != 0;
  23. }
  24. //判断该SelectionKey的预备操作符是否是OP_CONNECT
  25. public final boolean isConnectable() {
  26.    return (readyOps() & OP_CONNECT) != 0;
  27. }
  28. //判断该SelectionKey的预备操作符是否是OP_ACCEPT
  29. public final boolean isAcceptable() {
  30.    return (readyOps() & OP_ACCEPT) != 0;
  31. }
  32. //设置SelectionKey的附件
  33. public final Object attach(Object ob) {
  34.    return attachmentUpdater.getAndSet(this, ob);
  35. }
  36. //返回SelectionKey的附件
  37. public final Object attachment() {
  38.    return attachment;
  39. }

interestOps():对应的是该SelectionKey上注册的关注事件,通过register来完成感兴趣事件的集合

readyOps():通道上实际准备就绪的事件。

复用器的选择过程:

Selector抽象类中提供的方法:

Set<SelectionKey> keys()方法返回所有的注册的通道实例,一个selector上是可以注册多个通道,即返回所有注册的通道SelectionKey。

Set<SelectionKey> selectedKeys():返回的是所有准备就绪的通道实例。

选择过程:

select获取准备就绪的事件,返回就是已经有就是事件发生。

int select():阻塞到至少有一个通道上注册的事件就绪了;

int select(long timeout);和select()一样,会则塞等待事件就绪,最长阻塞时间为timeout;

int selectNow():非阻塞,调用会立即返回。

停止选择过程:

wakeup():通过调用wakeup()方法让调用select()处于阻塞的方法立即返回;

close():关闭selector实例,使得任意一个处于select则塞的线程都会被唤醒,会将selector上所有的channel都被注销(cancel)。

选择过程(图示):

Java中Selector复用器本质是对操作系统提供的IO复用模型(select()、poll()、epoll())封装

相关文章