并发编程系列之掌握Condition接口使用

x33g5p2x  于2021-12-30 转载在 其他  
字(3.0k)|赞(0)|评价(0)|浏览(352)

并发编程系列之掌握Condition接口使用

1、什么是Condition接口

Condition是jdk的juc包中提供的并发等待api,俗称条件等待,条件变量,用于在Lock中提供synchronized加Object的wait/notify等待通知模式。
注意,Condition只是一个api接口,具体实现还是依赖于具体的lock类,比如使用new ReentrantLock().newCondition();

  • Object中的wait()notify()notifyAll()和synchronized配合使用,可以唤醒一个或者全部
  • Condition需要和lock实现类配合使用,一个Lock实例可以创建多个Condition,一个条件一个等待集合,可根据条件精确控制等待线程

多线程读取队列,写入数据后,唤醒读取线程继续执行。读取数据后,唤醒写线程继续执行

2、Condition接口常用方法

用idea编辑器查看Condition方法,如图:

  • await():当前线程在接到信号或被中断之前一直处于等待状态
  • await(long time, TimeUnit unit):当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  • awaitNanos(long nanosTimeout):当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
  • awaitUninterruptibly():当前线程在接到信号之前一直处于等待状态。该方法对中断不敏感
  • awaitUntil(Date deadline):当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
  • signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  • signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

2、Condition例子

利用Condition实现生产者消费者模式:

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionLockQueue {

    private LinkedList<Object> queue;
    private volatile int capacity;
    private Condition productCondition = null;
    private Condition consumeCondition = null;
    private Lock lock;

    public ConditionLockQueue(int capacity) {
        this.queue = new LinkedList<Object>();
        this.capacity = capacity;
        lock = new ReentrantLock();
        this.productCondition  = lock.newCondition();
        this.consumeCondition = lock.newCondition();
    }

    public void put(Object obj) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                productCondition.await();
                System.out.println("产品仓库满了,不能生产!");
            }
            queue.add(obj);
            System.out.println("[Producer]: " + obj+ ",共有:"+queue.size());
            consumeCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object sub() throws InterruptedException {
        Object obj = null;
        lock.lock();
        try {
            while (queue.size() == 0) {
                consumeCondition.await();
                System.out.println("没有产品了,需要生产!");
            }
            Object prod =  queue.remove(0);
            System.out.println("[Consumer]: " + prod + ",共有:"+queue.size());
            productCondition.signal();
        }finally {
            lock.unlock();
        }
        return obj;
    }

    public boolean isEmpty() {
        return (queue.size() == 0);
    }

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        ConditionLockQueue queue = new ConditionLockQueue(10);
        Runnable produceTask = () -> {
            for (; ;) {
                try {
                    queue.put(random.nextInt(10));
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable consumeTask = () -> {
            for (; ;) {
                try {
                    queue.sub();
                    Thread.sleep(random.nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //3个生产者
        new Thread(produceTask).start();
        new Thread(produceTask).start();
        new Thread(produceTask).start();
        //3个消费者
        new Thread(consumeTask).start();
        new Thread(consumeTask).start();
        new Thread(consumeTask).start();

    }

}

附录参考资料

相关文章