阻塞队列最大的特性在于支持阻塞添加和阻塞删除方法:
阻塞添加:当阻塞队列已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行加入元素操作。
*
阻塞删除:但阻塞队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作
Java 中的阻塞队列接口 BlockingQueue 继承自 Queue 接口,因此先来看看阻塞队列接口为我们提供的主要方法:
public interface BlockingQueue<E> extends Queue<E> {
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
// 在成功时返回 true,如果此队列已满,则抛IllegalStateException。
boolean add(E e);
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
// 如果该队列已满,则在到达指定的等待时间之前等待可用的空间,该方法可中断
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。
void put(E e) throws InterruptedException;
//获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作
E take() throws InterruptedException;
//获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//从此队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o);
}
//除了上述方法还有继承自Queue接口的方法
//获取但不移除此队列的头元素,没有则跑异常NoSuchElementException
E element();
//获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek();
//获取并移除此队列的头,如果此队列为空,则返回 null。
E poll();
这里我们把上述操作进行分类:
(1)插入方法:
(2)删除方法
(3)检查方法:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/**获取数据的索引,主要用于take,poll,peek,remove方法 */
int takeIndex;
/**添加数据的索引,主要用于 put, offer, or add 方法*/
int putIndex;
/** 队列元素的个数 */
int count;
/** 控制并非访问的锁 */
final ReentrantLock lock;
/**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
private final Condition notEmpty;
/** notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
private final Condition notFull;
/** 迭代器 */
transient Itrs itrs = null;
}
ArrayBlockingQueue 内部通过数组对象 items 来存储所有的数据,需要注意的是ArrayBlockingQueue 通过一个 ReentrantLock 来同时控制添加线程与移除线程的并发访问,这点与 LinkedBlockingQueue 区别很大(稍后会分析)。而对于 notEmpty 条件对象则是用于存放等待或唤醒调用 take() 方法的线程,告诉他们队列已有元素,可以执行获取操作。同理 notFull 条件对象是用于等待或唤醒调用 put() 方法的线程,告诉它们队列未满,可以执行添加元素的操作。takeIndex 代表的是下一个方法(take,poll,peek,remove)被调用时获取数组元素的索引,putIndex 则代表下一个方法(put, offer, or add)被调用时元素添加到数组中的索引。
put() 方法特点是阻塞添加,当队列满时通过条件对象来阻塞当前调用 put() 方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况:一是队列已满,那么新到来的put 线程将添加到 notFull 的条件队列中等待;二是有移除线程执行移除操作,移除成功同时唤醒put线程。
具体代码如下:
//put方法,阻塞时可中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中等待唤醒
notFull.await();
enqueue(e);//如果队列没有满直接添加。。
} finally {
lock.unlock();
}
}
//入队操作
private void enqueue(E x) {
//获取当前数组
final Object[] items = this.items;
//通过putIndex索引对数组进行赋值
items[putIndex] = x;
//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//队列中元素数量加1
//唤醒调用take()方法的线程,执行元素获取操作。
notEmpty.signal();
}
take() 方法其实很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty 条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作,图示如下:
具体代码如下:
//从队列头部删除,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//中断
try {
//如果队列没有元素
while (count == 0)
//执行阻塞操作
notEmpty.await();
return dequeue();//如果队列有元素执行删除操作
} finally {
lock.unlock();
}
}
//删除队列头元素并返回
private E dequeue() {
//拿到当前数组的数据
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要删除的对象
E x = (E) items[takeIndex];
将数组中takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等,
//如果相等说明已到尽头,恢复为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//队列个数减1
if (itrs != null)
itrs.elementDequeued();//同时更新迭代器中的元素数据
//删除了元素说明队列有空位,唤醒notFull条件对象添加线程,执行添加操作
notFull.signal();
return x;
}
LinkedBlockingQueue 是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,但大小默认值为 Integer.MAX_VALUE,建议使用 LinkedBlockingQueue时手动传值,避免队列过大造成机器负载或者内存爆满等情况
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 节点类,用于存储数据
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;
/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();
/** 阻塞队列的头结点 */
transient Node<E> head;
/** 阻塞队列的尾节点 */
private transient Node<E> last;
/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();
}
从上述可看成,每个添加到 LinkedBlockingQueue 队列中的数据都将被封装成 Node 节点,添加的链表队列中,其中 head 和 last 分别指向队列的头结点和尾结点。与 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 内部分别使用了 takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,可以大大提高吞吐量。这里再次强调如果没有给 LinkedBlockingQueue 指定容量大小,其默认值将是 Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出。至于 LinkedBlockingQueue 的实现原理图与 ArrayBlockingQueue 是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的 Condition 条件对象作为等待队列,用于挂起 take 线程和 put 线程。
public void put(E e) throws InterruptedException {
//添加元素为null直接抛出异常
if (e == null) throw new NullPointerException();
int c = -1;
//构建节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//获取队列的个数
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//判断队列是否已满,如果已满则阻塞当前线程
while (count.get() == capacity) {
notFull.await();
}
//添加元素并更新count值
enqueue(node);
c = count.getAndIncrement();
//如果队列容量还没满,唤醒下一个添加线程,执行添加操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等待线程,因此count肯定会变化
//这里的if条件表示如果队列中还有1条数据,由于队列中存在数据那么就唤醒消费锁
if (c == 0)
signalNotEmpty();
}
这里的 put()方法做了两件事,第一件事是判断队列是否满,满了将当前线程加入等下队列,没满就将节点封装成 Node入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象 notFull 上的添加线程。第二件事是,判断是否需要唤醒等到在 notEmpty 条件对象上的消费线程。这里我们可能会有点疑惑,为什么添加完成后是继续唤醒在条件对象 notFull 上的添加线程而不是像 ArrayBlockingQueue 那样直接唤醒 notEmpty 条件对象上的消费线程?而又为什么要当 if (c == 0) 时才去唤醒消费线程呢?
public E take() throws InterruptedException {
E x;
int c = -1;
//获取当前队列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中断
try {
//如果队列没有数据,挂机当前线程到条件对象的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在数据直接删除并返回该数据
x = dequeue();
c = count.getAndDecrement();//队列大小减1
if (c > 1)
notEmpty.signal();//还有数据就唤醒后续的消费线程
} finally {
takeLock.unlock();
}
//满足条件,唤醒条件对象上等待队列中的添加线程
if (c == capacity)
signalNotFull();
return x;
}
take() 方法是一个可阻塞可中断的移除方法,主要做了两件事,一是,如果队列没有数据就挂起当前线程到 notEmpty 条件对象的等待队列中一直等待,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程,二是尝试唤醒条件对象 notFull 上等待队列中的添加线程。
通过上述的分析,对于 ArrayBlockingQueue 和 LinkedBlockingQueue 的基本使用以及内部实现原理我们已较为熟悉了,这里我们就对它们两间的区别来个小结:
参考文章:https://blog.csdn.net/javazejian/article/details/77410889
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/a745233700/article/details/120691533
内容来源于网络,如有侵权,请联系作者删除!