Java 阻塞队列

x33g5p2x  于2021-12-09 转载在 Java  
字(2.9k)|赞(0)|评价(0)|浏览(486)

定义

阻塞队列,它是一个特殊的队列,符合先进先出的特点,是并发编程中的一个重要基础组件
阻塞队列的一个典型应用场景就是 “生产者消费者模型”;其作为生产者—消费者模型中的交易场所

简单介绍 生产者—消费者 模型

以包饺子为例:
(假设面和馅是备好的,且 擀面杖只有一个~)

方法1: A,B,C 三个人包饺子;这三个人分别进行 擀皮 + 包饺子 过程
该方法的锁竞争(擀面杖只有一个)太激烈

方法2: 一个人负责擀皮,另外两个负责包饺子,例:A 负责擀皮,B 和 C 负责包饺子
这就是"生产者—消费者 模型"
A: 生产者 — (生产饺子皮)
B,C:消费者 — (消费饺子皮)
此处应还有一个"交易场所":放饺子皮的东西,比如:大盘子~

阻塞队列就是生产者—消费者生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.

阻塞队列特点:

  • 若入队列操作太快,队列满了,继续入队列,就会阻塞;一直阻塞到其他线程去消费队列了,才能继续入队列 (大盘子上很快就放满了饺子皮,生产者就要等待,一直等到消费者消费了一些数据(饺子皮),即:交易场所上有了空位,才能继续生产)
  • 若出队列操作太快,队列空了,继续出队列,也会阻塞;一直阻塞到其他线程生产了元素,才能继续出队列 (大盘子上很快就空了,饺子皮很快被"消费"完了,消费者就要等待,一直等到生产者生产了新的元素(饺子皮),消费者才能继续消费)

代码实现阻塞队列

实现阻塞队列前,我们先思考普通队列是如何实现的??
有两种方式:
1.基于链表;
2.基于数组 — 循环队列

以基于数组为例:
即,通过循环队列实现:

  1. class BlockingQueue{
  2. private int[] array = new int[10];
  3. // [head,tail)
  4. // 两者重合时,队列可能为空,也可能为满
  5. private volatile int head = 0;
  6. private volatile int tail = 0;
  7. private volatile int size = 0; // 有效元素个数
  8. /* * 阻塞队列 入队列 * 为了和普通队列入队列区分,使用 put * */
  9. public void put(int value) throws InterruptedException {
  10. synchronized (this){
  11. // 若队列满了, 阻塞等待, 等下面的出队列操作调用 notify 方法后才可继续执行
  12. if(size == array.length){
  13. wait();
  14. }
  15. array[tail] = value;
  16. tail++;
  17. if(tail == array.length){
  18. tail = 0;
  19. }
  20. size++;
  21. // 唤醒 出队列操作
  22. notify();
  23. }
  24. }
  25. /* * 阻塞队列 出队列 * 为了和普通队列出队列区分,使用 take * */
  26. public int take() throws InterruptedException {
  27. int ret = -1;
  28. synchronized (this){
  29. // 若队列为空, 阻塞等待, 等到有元素入队列再开始
  30. if(size == 0){
  31. wait();
  32. }
  33. ret = array[head];
  34. head++;
  35. if(head == array.length){
  36. head = 0;
  37. }
  38. size--;
  39. // 唤醒 入队列操作
  40. notify();
  41. }
  42. return ret;
  43. }
  44. }

代码解析:

上述两个 wait( ) 是一定不可能被同时调用的!!

  • 入队列操作的 wait( )
    当队列已满时,即:size == array.length;若有线程调用 put 方法,就让其执行 wait 方法,使该线程阻塞等待,直到有其他线程调用 take 方法,取出元素后,调用 notify 方法将刚才调用 put 方法产生阻塞的线程唤醒,接着继续执行 put 后续的操作
  • 出队列操作的 wait( )
    当队列为空时,即:size == 0;若有线程调用 take 方法,就让其执行 wait 方法,使该线程阻塞,直到有其他线程调用 put 方法,插入元素后,调用 notify 方法将刚才调用 take 方法产生阻塞的线程唤醒,接着继续执行 take 后续的操作

测试代码:

创建两个线程,分别模拟 生产者 和 消费者

  1. 消费的快,生产的慢 —— 预期看到:消费者线程会阻塞等待,有生产元素后消费者才能消费
  2. 消费的慢,生产的快 —— 预期看到::生产者线程会阻塞等待,消费者消费元素后,生产者才能继续生产

以第 2 种为例:

  1. public static void main(String[] args) {
  2. BlockingQueue blockingQueue = new BlockingQueue();
  3. // 创建两个线程,分别模拟 生产者 和 消费者
  4. Thread producer = new Thread(){
  5. @Override
  6. public void run(){
  7. for (int i = 0; i < 10000; i++) {
  8. try {
  9. blockingQueue.put(i);
  10. System.out.println("生产元素: " + i);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }
  16. };
  17. producer.start();
  18. Thread consumer = new Thread(){
  19. @Override
  20. public void run(){
  21. while (true){
  22. try {
  23. int ret = blockingQueue.take();
  24. System.out.println("消费元素: " + ret);
  25. Thread.sleep(500);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. };
  32. consumer.start();
  33. }

截取部分输出结果:

假设把上述代码中的 notify( ) 改成 notifyAll( ),此时会发生什么??

假设现在有三个线程,其中一个线程生产,两个线程消费,且消费速度快于生产速度

所以,两个消费者线程都触发了 wait 操作,也就是都发生了阻塞;当我们调用 notifyAll( ) ,会将上述两个线程都唤醒,然后这两个线程都去尝试重新竞争锁
假设:
消费者1,先获取到锁,于是执行出队列操作(执行完毕释放锁)
消费者2,后获取到锁,于是也会执行后续的出队列操作,但是刚才生产者生产的一个元素,已经被消费者1线程 给取走了,即,当前实际是一个空的队列,若强行执行出队列操作,就会出现逻辑上的错误!!

改正方法:
将 wait( ) 方法包裹的 if 改为 while

  1. // 入队列
  2. while (size == array.length){
  3. wait();
  4. }
  5. // 出队列
  6. while (size == array.length){
  7. wait();
  8. }

此时两个消费者线程尝试竞争锁
消费者1,先获取到锁,wait( ) 就返回了,再次执行 while 中的条件(由于当前生产者线程生产了一个元素,size 不为0 ),循环退出,之后,消费者1 就可以执行后续出队列操作,执行完毕后,释放锁
消费者2,后获取到锁,wait( ) 返回,再次执行 while 中的条件(由于刚才的消费者1 已经把生产的元素取走了,size 又是 0),循环继续执行,又一次调用 wait( ),只能继续等…

相关文章

最新文章

更多