并发编程篇:AQS的原理,有这一篇就够了~~

x33g5p2x  于2021-12-18 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(526)

队列同步器AQS,用来构建锁或者其他同步组件的基础框架,它有int类型的 state变量表示同步状态,以及内置的FIFO队列来完成志愿获取线程的排队工作。

基本组成

队列同步器接口
1、同步状态方法
  1. private volatile int state;//共享变量,使用volatile修饰保证线程可见性

AQS提供了3个修改状态的方法:

  • getState(): 获取同步状态
  • setState(int newState) : 设置同步状态
  • compareAndSetState(int expect, int update) :使用CAS设置当前的状态,保证了状态修改的原子性
2、可以重写的方法
方法作用
protected boolean tryAcquire(int arg)这是独占式获取同步状态的方法,该方法的实现需要查询到当前的同步状态,同时做出相应判断,最后再通过CAS设置同步状态
protected boolean tryRelease(int arg)这是独占式释放同步状态的方法,让那些等待获取同步状态的线程能够有机会获取同步状态
protected int tryAcquireShared(int arg)这是共享式获取同步状态的方法,返回的值大于等于0,说明就获取成功了,否则,就是获取失败
protected boolean tryReleaseShared(int arg)这是共享式释放同步状态的方法,让那些等待获取同步状态的线程能够有机会获取同步状态
protected boolean isHeldExclusively()这个方法用来判断当前同步器是否在独占模式下被线程占用,它会取出占用的线程和当前线程做个比较,看下是否相等
3、AQS的模版方法

模版方法分为3类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况

方法作用
public final void acquire(int arg)这个方法是独占式获取同步状态的方法,该方法会调用重写的tryAcquire()方法来配合做结果判断,如果当前线程获取同步状态是成功的,该方法就会返回,如果不成功就会进入同步队列等待
public final void acquireInterruptibly(int arg)和acquire()方法类似,不过它多了个响应中断的能力,当前线程未获取到同步状态,一样会进入同步队列,但是如果当前线程被中断,那就会抛出InterruptedException异常并返回
public final boolean tryAcquireNanos(int arg, long nanosTimeout)在acquireInterruptibly()方法的基础上又加了超时的限制,这样可以让在规定时间内无法获取到同步器状态的线程直接返回false,当然了如果获取到则返回true。
public final void acquireShared(int arg)这是共享式获取同步状态的方法,该方法会调用重写的tryAcquireShared()方法来配合做结果判断,如果当前线程获取同步状态是成功的,该方法就会返回,如果不成功就会进入同步队列等待,不过这个方法支持同一时刻可以有多个线程获取同步状态。
public final void acquireSharedInterruptibly(int arg)和acquireShared()方法类似,不过它多了个响应中断的能力,当前线程未获取到同步状态,一样会进入同步队列,但是如果当前线程被中断,那就会抛出InterruptedException异常并返回
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)在acquireSharedInterruptibly()方法的基础上又加了超时的限制,这样可以让在规定时间内无法获取到同步器状态的线程直接返回false,当然了如果获取到则返回true。
public final boolean release(int arg)这也是独占式的方法,用来释放同步状态,然后将同步队列中的第一个节点包含的线程唤醒
public final boolean releaseShared(int arg)这也是共享式的方法,用来释放同步状态,然后将同步队列中的第一个节点包含的线程唤醒
public final Collection getQueuedThreads()获取等待在同步队列上的线程集合

同步队列实现

同步队列依赖内部的同步队列来完成状态的管理,当前线程获取同步队列失败时,同步器会将当前线程以及等待状态等信息构造一个节点(Node)并放入同步队列中,同时会阻塞当前线程,当前状态释放时候,会把首节点中的线程唤醒,使其再次获取同步状态

节点(Node)组成
1、int waitStatus

等待状态:包含如下状态

  1. CANCELLED ,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消 等待,节点进入该状态将不会变化
  2. SIGNAL 值为-1,后继节点的线程处于等待状态,而当前节点的线程若释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
  3. CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
  4. PROPAGATE 值为-3,表示下一次共享式同步状态获取将会无条件的被传播下去
  5. INITIAL 值为0,初始状态
2、int waitStatus

前驱节点,当节点加入到同步队列时被设置(尾部添加)

3、Node next

后继节点

4、Node nextWaiter

等待队列中的后继节点。若当前节点是共享的,那么这个字段将是一个SHARED常量,也是说节点类别(独占和共享)和等待队列中的后继节点共用同一个字段

5、Thread thread

获取同步状态的线程

队列同步器的基本结构

独占式同步状态获取与释放

1、获取独占式同步状态

A、主要过程

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

如代码所示:

  1. tryAcquire(arg),线程安全获取同步状态;获取成功返回
  2. 如果获取失败,构造同步节点,,通过addWaiter加入同步队列的尾部,最后调用acquireQueued,以死循环的方式获取同步状态

B、addWaiter方法:节点构造并加入同步队列中,构造的节点是独占式的(Node.EXCLUSIVE)

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

C、acquireQueued:节点进入同步队列中自旋,每个节点都在检查,当条件满足了,获取同步状态,就可以自旋过程退出,否则留在字段过程中,并阻塞节点的线程

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

D、acquire流程图如下

2、释放独占式同步状态

当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,时候去节点获取同步状态

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }
  10. private void unparkSuccessor(Node node) {
  11. int ws = node.waitStatus;
  12. if (ws < 0)
  13. compareAndSetWaitStatus(node, ws, 0);
  14. Node s = node.next;
  15. // 找到第一个节点
  16. if (s == null || s.waitStatus > 0) {
  17. s = null;
  18. for (Node t = tail; t != null && t != node; t = t.prev)
  19. if (t.waitStatus <= 0)
  20. s = t;
  21. }
  22. if (s != null)
  23. LockSupport.unpark(s.thread);
  24. }

共享式同步状态获取与释放

1、共享式同步状态获取

主要过程:
A、调用tryAcquireShared尝试获取同步状态、如果返回值>0,则获取成功
B、否则,在自旋过程中,如果是头节点,并尝试同步状态成功,如果获取状态>0,唤醒后继节点,并退出自旋

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }
  5. private void doAcquireShared(int arg) {
  6. final Node node = addWaiter(Node.SHARED);
  7. boolean failed = true;
  8. try {
  9. boolean interrupted = false;
  10. for (;;) {
  11. final Node p = node.predecessor();
  12. if (p == head) {
  13. int r = tryAcquireShared(arg);
  14. if (r >= 0) {
  15. setHeadAndPropagate(node, r);
  16. p.next = null; // help GC
  17. if (interrupted)
  18. selfInterrupt();
  19. failed = false;
  20. return;
  21. }
  22. }
  23. if (shouldParkAfterFailedAcquire(p, node) &&
  24. parkAndCheckInterrupt())
  25. interrupted = true;
  26. }
  27. } finally {
  28. if (failed)
  29. cancelAcquire(node);
  30. }
  31. }
2、共享式同步状态释放

释放同步状态,并唤醒后继节点

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

独占式超时获取同步状态

获取独占超时同步状态

doAcquireNanos(int arg, long nanosTimeout)可以超时获取同步状态,也就是在固定时间段内获取同步状态,如果获取成功则返回true;发否则返回fase

  1. private boolean doAcquireNanos(int arg, long nanosTimeout)
  2. throws InterruptedException {
  3. if (nanosTimeout <= 0L)
  4. return false;
  5. final long deadline = System.nanoTime() + nanosTimeout;
  6. final Node node = addWaiter(Node.EXCLUSIVE);
  7. boolean failed = true;
  8. try {
  9. for (;;) {
  10. final Node p = node.predecessor();
  11. if (p == head && tryAcquire(arg)) {
  12. setHead(node);
  13. p.next = null; // help GC
  14. failed = false;
  15. return true;
  16. }
  17. nanosTimeout = deadline - System.nanoTime();
  18. if (nanosTimeout <= 0L)
  19. return false;
  20. if (shouldParkAfterFailedAcquire(p, node) &&
  21. nanosTimeout > spinForTimeoutThreshold)
  22. LockSupport.parkNanos(this, nanosTimeout);
  23. if (Thread.interrupted())
  24. throw new InterruptedException();
  25. }
  26. } finally {
  27. if (failed)
  28. cancelAcquire(node);
  29. }
  30. }

独占式获取同步状态流程图

欢迎关注公众号"程序员ZZ的源码",一起学习

相关文章