Java集合(四): Queue源码剖析

x33g5p2x  于2021-10-20 转载在 Java  
字(8.7k)|赞(0)|评价(0)|浏览(740)

写在前面

Queue用于模拟队列这种数据结构,队列通常是指“先进先出”(FIFO=first in first out)的容器。新元素插入(offer)到队列的尾部,访问元素(poll)操作会返回队列头部的元素。通常,队列不允许随机访问队列中的元素。

为什么用?有什么好处?

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即线程阻塞),一旦条件满足,被挂起的线程优惠被自动唤醒

为什么需要使用BlockingQueue

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为BlockingQueue都一手给你包办好了
在concurrent包 发布以前,在多线程环境下, 我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度.

种结构就相当于我们排队上车,先到的站在前面,先上车,后到的得等前面先上车了再上车。

、1、Queue实现

Java中的Queue的实现有三种方式:

  • 阻塞队列
  • 非阻塞队列
  • 双向队列

类图UML

1.1、阻塞队列BlockingQueue(5)

1.1.1、BlockingQueue说明       

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。

阻塞队列是一个可以阻塞的先进先出集合,比如某个线程在空队列获取元素时、或者在已存满队列存储元素时,都会被阻塞。
说白了就是干等着,啥也干不了。排队上车的时候,你就只能一直站在在那里排队,你要是想去上厕所回来你的位置都不见了,还得重新排队。

1.1.2、BlockingQueue 接口实现类:

  • ArrayBlockingQueue :基于数组的有界阻塞队列,必须指定大小。
  • LinkedBlockingQueue :基于单链表的无界阻塞队列,不需指定大小。
  • PriorityBlockingQueue :基于最小二叉堆的无界、优先级阻塞队列。
  • DelayQueue:基于延迟、优先级、无界阻塞队列。
  • SynchronousQueue :基于 CAS 的阻塞队列。

1.1.3、方法:

  • add():新增一个元索,假如队列已满,则抛异常Illegal state exception。
  • remove():执行删除操作,返回队列头部的元素,假如队列为空,则抛异常。
  • offer():新增一个元素,假如队列没满则返回 true,假如队列已满,则返回 false。
  • poll():执行删除操作,返回队列头部的元素,假如队列为空,则返回 null。
  • put():新增一个元素,假如队列满,则阻塞。
  • take():执行删除操作,返回队列头部的元素,假如队列为空,则阻塞。
  • element():获取队列头部一个元素,假如队列为空,则抛异常NoSuchElementException。
  • peek():获取队列头部一个元素,假如队列为空,则返回 null。

1.2、非阻塞队列AbstractQueue(2)

1.2.1、AbstractQueue说明       

    非阻塞队列是使用CAS(compare and set)机制实现,类似 volatile,并发性能好。
人太多了,很多现在开始流行取号,先取个号,看着离我这号太远了,我出去溜达溜达一下再来。

1.2.2、AbstractQueue接口实现类:

常用的阻塞队列有 PriorityQueue 和 ConcurrentLinkedQueue。

  • PriorityQueue :基于优先级的无界优先级队列
  • ConcurrentLinkedQueue:基于双向链表结构的无界并发队列

1.2.3、方法

1.3、双端队列Deque(2)

Deque 是一个既可以在头部操作元素,又可以为尾部操作元素,俗称为双向(双端)队列。Deque 继承自 Queue,Deque 实现类有 LinkedList、 ArrayDeque、ConcurrentLinkedDeque 等

  • LinkedList:基于单链表的无界双端队列,允许元素为 null
  • ArrayDeque:基于数组的有界双端队列,不允许 null。不是线程安全的。当作为栈使用时,性能比Stack好;当作为队列使用时,性能比LinkedList好。

2、Queue分类

2.1、阻塞队列和非阻塞队列

**  阻塞队列**(Blocking Queue)提供了可阻塞的 put 和 take 方法,它们与可定时的 offer 和 poll 是等价的。如果队列满了 put 方法会被阻塞等到有空间可用再将元素插入;如果队列是空的,那么take 方法也会阻塞,直到有元素可用。当队列永远不会被充满时,put 方法和 take 方法就永远不会阻塞。

public class BlockingQueue {

    public static void main(final String[] args) {
        final ArrayBlockingQueue queue = new ArrayBlockingQueue(5);

        Thread thread1 = new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        queue.put(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(new Date() + " | ArrayBlockingQueue Size:" + queue.size());
                }
                System.out.println(new Date() + " | For End.");
            }
        };

        thread1.start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(!queue.isEmpty()){
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

    }
}

非阻塞队列也就是普通队列,它的名字中不会包含 BlockingQueue 关键字,并且它不会包含 put 和 take 方法,当队列满之后如果还有新元素入列会直接返回错误,并不会阻塞的等待着添加元素,如下图所示:

非阻塞队列的典型代表是 ConcurrentLinkedQueue 和PriorityQueue

2.2、有界队列和无界队列

有界队列:是指有固定大小的队列,比如设定了固定大小的 ArrayBlockingQueue,又或者大小为 0 的 SynchronousQueue


无界队列:指的是没有设置固定大小的队列,但其实如果没有设置固定大小也是有默认值的,只不过默认值是 Integer.MAX_VALUE,当然实际的使用中不会有这么大的容量(超过 Integer.MAX_VALUE),所以从使用者的角度来看相当于 “无界”的。

2.3、普通队列

普通队列(Queue)是指实现了先进先出的基本队列,例如 ArrayBlockingQueue 和 LinkedBlockingQueue,其中 ArrayBlockingQueue是用数组实现的普通队列,如下图所示:

而 LinkedBlockingQueue 是使用链表实现的普通队列,如下图所示:

注意:一般情况下 offer() 和 poll() 方法配合使用,put() 和 take() 阻塞方法配合使用,add() 和 remove() 方法会配合使用,程序中常用的是 offer() 和 poll() 方法,因此这两个方法比较友好,不会报错

public class BlockingQueue {

    public static void main(final String[] args) {

        LinkedBlockingQueue queue = new LinkedBlockingQueue(2);
        queue.offer("Hello");
        queue.offer("Java");
        queue.offer("中文社群");
        while (!queue.isEmpty()) {
            System.out.println(queue.poll());
        }
    }

2.4、双端队列

双端队列(Deque)是指队列的头部和尾部都可以同时入队和出队的数据结构,如下图所示:

** BlockingDeque**

java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。

BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。 deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。BlockingDeque 图解:

BlockingDeque 的方法

一个 BlockingDeque - 线程在双端队列的两端都可以插入和提取元素。 一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

public class BlockingQueue {

    public static void main(final String[] args) {
        // 创建一个双端队列
        LinkedBlockingDeque deque = new LinkedBlockingDeque();
        deque.offer("offer"); // 插入首个元素
        deque.offerFirst("offerFirst"); // 队头插入元素
        deque.offerLast("offerLast"); // 队尾插入元素
        while (!deque.isEmpty()) {
            // 从头遍历打印
            System.out.println(deque.poll());
        }
    }
offerFirst
offer
offerLast

 2.5、优先队列

优先队列(PriorityQueue)是一种特殊的队列,它并不是先进先出的,而是优先级高的元素先出队。优先队列是根据二叉堆实现的,二叉堆的数据结构如下图所示:

**        优先队列的作用是能保证每次取出的元素都是队列中权值最小的**(Java的优先队列每次取最小)。这里牵涉到了大小关系,元素大小的评判可以通过元素本身的自然顺序(natural ordering),也可以通过构造时传入的比较器。

二叉堆分为两种类型:一种是最大堆一种是最小堆。以上展示的是最大堆,在最大堆中,任意一个父节点的值都大于等于它左右子节点的值。
因为优先队列是基于二叉堆实现的,因此它可以将优先级最好的元素先出队。

public class BlockingQueue {

    public static void main(final String[] args) {
        PriorityQueue queue = new PriorityQueue<>(10,
                new Comparator<Student>(){
                    @Override
                    public int compare(Student v1, Student v2) {
                        return v2.getLevel() - v1.getLevel();
                    }
                });

        queue.offer(new Student(1,"a",5));
        queue.offer(new Student(1,"a",1));
        queue.offer(new Student(1,"a",3));

        while(!queue.isEmpty()) {
            System.out.println(queue.poll().toString());
        }

    }

    @Data
    private static class Student {
        private int id;
        private String name;
        private int level;

        public Student(int id, String name, int level) {
            this.id = id;
            this.name = name;
            this.level = level;
        }
    }

}

2.6、延迟队列

延迟队列(DelayQueue)是基于优先队列 PriorityQueue 实现的,它可以看作是一种以时间为度量单位的优先的队列,当入队的元素到达指定的延迟时间之后方可出队。

2.7、其他队列

在 Java 的队列中有一个比较特殊的队列 SynchronousQueue,它的特别之处在于它内部没有容器,每次进行 put() 数据后(添加数据),必须等待另一个线程拿走数据后才可以再次添加数据,它的使用示例如下:

public class SynchronousQueueTest {

    public static void main(String[] args) {
        SynchronousQueue queue = new SynchronousQueue();

        // 入队
        new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                try {
                    System.out.println(new Date() + ",元素入队");
                    queue.put("Data " + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();

        // 出队
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                    System.out.println(new Date() + ",元素出队:" + queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
Wed Oct 20 01:54:52 CST 2021,元素入队
Wed Oct 20 01:54:53 CST 2021,元素出队:Data 0
Wed Oct 20 01:54:53 CST 2021,元素入队
Wed Oct 20 01:54:54 CST 2021,元素出队:Data 1
Wed Oct 20 01:54:54 CST 2021,元素入队
Wed Oct 20 01:54:55 CST 2021,元素出队:Data 2

从上述结果可以看出,当有一个元素入队之后,只有等到另一个线程将元素出队之后,新的元素才能再次入队。

3、Queue使用场景

很典型的JDK自带的线程池中就大量使用了Queue来存储任务。

4、BlockingQueue阻塞队列是线程安全的吗?

BlockingQueue的三个实现类, 发现对应的方法中都使用了锁,ReentratLock, 所以不会出现线程安全问题。

4.1、ArrayBlocking

//ArrayBlockingQueue的put()方法
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //获取锁
        lock.lockInterruptibly();
        try {
            while (count == items.length)
            	//队列满了, 进入阻塞状态 
                notFull.await();
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }

//ArrayBlockingQueue的take()方法
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //获取锁
        lock.lockInterruptibly();
        try {
            while (count == 0)
	            //队列为空,进入阻塞状态
                notEmpty.await();
            //弹出元素
            return dequeue();
        } finally {
        	//释放锁
            lock.unlock();
        }
    }

4.2、LinkedBlockingQueue

//LinkedBlockingQueue的put()方法
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final int c;
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取锁
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            //元素入队
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
	        //释放锁
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
	//LinkedBlockingQueue的take()方法
    public E take() throws InterruptedException {
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //获取锁
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
	        //释放锁
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

4.3、PriorityBlockingQueue

 

//PriorityBlockingQueue的put()方法
public void put(E e) {
		//锁的操作在offer()方法中
        offer(e); // never need to block
    }
//PriorityBlockingQueue的offer()方法
public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        //获取锁
        lock.lock();
        int n, cap;
        Object[] es;
        while ((n = size) >= (cap = (es = queue).length))
            tryGrow(es, cap);
        try {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftUpComparable(n, e, es);
            else
                siftUpUsingComparator(n, e, es, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
	        //释放锁
            lock.unlock();
        }
        return true;
    }
//PriorityBlockingQueue的take()方法
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //获取锁
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            //释放锁
            lock.unlock();
        }
        return result;
    }

参考:Java中,Queue的3种方式实现方式 - 知乎
      图解Java中的5大队列!(干货收藏) - 知乎Java队列(Queue)了解及使用 - 简书

相关文章