并发编程系列之掌握Condition接口使用

x33g5p2x  于2021-12-02 转载在 其他  
字(2.9k)|赞(0)|评价(0)|浏览(362)

并发编程系列之掌握Condition接口使用

1、什么是Condition接口

Condition是jdk的juc包中提供的并发等待api,俗称条件等待,条件变量,用于在Lock中提供synchronized加Object的wait/notify等待通知模式。

  • Object中的wait()notify()notifyAll()和synchronized配合使用,可以唤醒一个或者全部
  • Condition需要和lock实现类配合使用,一个Lock实例可以创建多个Condition,一个条件一个等待集合,可根据条件精确控制等待线程

多线程读取队列,写入数据后,唤醒读取线程继续执行。读取数据后,唤醒写线程继续执行

2、Condition接口常用方法

用idea编辑器查看Condition方法,如图:

  • await():当前线程在接到信号或被中断之前一直处于等待状态
  • await(long time, TimeUnit unit):当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  • awaitNanos(long nanosTimeout):当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
  • awaitUninterruptibly():当前线程在接到信号之前一直处于等待状态。该方法对中断不敏感
  • awaitUntil(Date deadline):当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
  • signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  • signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

2、Condition例子

利用Condition实现生产者消费者模式:

  1. import java.util.LinkedList;
  2. import java.util.Random;
  3. import java.util.concurrent.locks.Condition;
  4. import java.util.concurrent.locks.Lock;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. public class ConditionLockQueue {
  7. private LinkedList<Object> queue;
  8. private volatile int capacity;
  9. private Condition productCondition = null;
  10. private Condition consumeCondition = null;
  11. private Lock lock;
  12. public ConditionLockQueue(int capacity) {
  13. this.queue = new LinkedList<Object>();
  14. this.capacity = capacity;
  15. lock = new ReentrantLock();
  16. this.productCondition = lock.newCondition();
  17. this.consumeCondition = lock.newCondition();
  18. }
  19. public void put(Object obj) throws InterruptedException {
  20. lock.lock();
  21. try {
  22. while (queue.size() == capacity) {
  23. productCondition.await();
  24. System.out.println("产品仓库满了,不能生产!");
  25. }
  26. queue.add(obj);
  27. System.out.println("[Producer]: " + obj+ ",共有:"+queue.size());
  28. consumeCondition.signal();
  29. } finally {
  30. lock.unlock();
  31. }
  32. }
  33. public Object sub() throws InterruptedException {
  34. Object obj = null;
  35. lock.lock();
  36. try {
  37. while (queue.size() == 0) {
  38. consumeCondition.await();
  39. System.out.println("没有产品了,需要生产!");
  40. }
  41. Object prod = queue.remove(0);
  42. System.out.println("[Consumer]: " + prod + ",共有:"+queue.size());
  43. productCondition.signal();
  44. }finally {
  45. lock.unlock();
  46. }
  47. return obj;
  48. }
  49. public boolean isEmpty() {
  50. return (queue.size() == 0);
  51. }
  52. public static void main(String[] args) throws InterruptedException {
  53. Random random = new Random();
  54. ConditionLockQueue queue = new ConditionLockQueue(10);
  55. Runnable produceTask = () -> {
  56. for (; ;) {
  57. try {
  58. queue.put(random.nextInt(10));
  59. Thread.sleep(1000);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. };
  65. Runnable consumeTask = () -> {
  66. for (; ;) {
  67. try {
  68. queue.sub();
  69. Thread.sleep(random.nextInt(100));
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. };
  75. //3个生产者
  76. new Thread(produceTask).start();
  77. new Thread(produceTask).start();
  78. new Thread(produceTask).start();
  79. //3个消费者
  80. new Thread(consumeTask).start();
  81. new Thread(consumeTask).start();
  82. new Thread(consumeTask).start();
  83. }
  84. }

附录参考资料

相关文章