从ReentrantLock到AQS源码阅读

x33g5p2x  于2022-02-07 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(470)

ReentrantLock是使用AQS实现的一种可重入的独占锁,并且提供了公平和非公平两种策略。

ReentractLock的内部类Sync继承了AbstractQueuedSynchronizer抽象类,并且提供了两个内部类FairSync和NonfairSync(即公平锁与非公平锁的落地实现),而具体采用哪一种加锁策略则则是由ReentractLock的构造函数进行指定。

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }
  4. public ReentrantLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. }

从上面的构造函数可以看到ReentrantLock默认使用的是非公平锁,传入true启用公平锁,传入false则是非公平锁

  1. public void lock() {
  2. sync.lock();
  3. }

从加锁的过程来看,lock方法调用了sync的lock方法,而Sync为抽象类,没有实现lock方法

  1. // Sync类的lock方法
  2. abstract void lock();

公平锁的加锁过程

那么ReentrantLock的lock方法一定调用的是其子类NonfairSync或FairSync类的lock方法(也就是说加锁的过程与ReentractLock采用的策略有关系),首先分析FairSync(公平锁)的加锁过程。

  1. // FairSync(公平锁)类的lock方法
  2. final void lock() {
  3. acquire(1);
  4. }
  5. // AQS的acquire方法
  6. public final void acquire(int arg) {
  7. if (!tryAcquire(arg) &&
  8. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  9. selfInterrupt();
  10. }

从上面的源码可以看出FairSync类的lock方法调用了AQS的acquire方法,而acquire方法调用了tryAcquire方法的落地实现依旧在FairSync类中,在分析tryAcquire方法前先分析一下AQS的内部属性

AQS内部定义了一个Node类,Node中有两个Node类的属性prev和next,因此Node类应该是一个双向链表,AQS中定义了Node类的head和tail,那么AQS的内部维护了一个Node类的双向链表,Node类中还有一个nextWaiter属性(区别于prev和next,nextWaiter只有单向的引用),分析应该是一个单链表,Node类中的thread属性表示Node封装的线程,waitStatus表示Node的状态。

在AQS中有一个int类型的state属性,根据锁的实现不同有不同的含义,在ReentractLock中state表示锁的可重入次数,在ReentractReadWriteLock中state的高16位表示读锁的获取次数,低16位则表示写锁的可重入次数。如果自己使用AQS实现一把锁,那么也可以自定义state的表示含义。AQS继承了AbstractOwnableSynchronizer的exclusiveOwnerThread属性表示持有锁的线程,用于可重入的判断。
分析完AQS的属性后,再看FairSync类的tryAcquire方法

  1. protected final boolean tryAcquire(int acquires) {
  2. // 获取当前线程
  3. final Thread current = Thread.currentThread();
  4. // 获取锁的状态
  5. int c = getState();
  6. if (c == 0) { //c == 0表示锁未被其他线程获取,可以加锁
  7. if (!hasQueuedPredecessors() && // 判断自己是否需要排队
  8. compareAndSetState(0, acquires)) { // CAS修改锁的状态--加锁
  9. setExclusiveOwnerThread(current); // 设置持有锁的线程为当前线程
  10. return true; // 加锁成功
  11. }
  12. }
  13. else if (current == getExclusiveOwnerThread()) { // 当前线程持有锁,可重入
  14. int nextc = c + acquires; // 设置重入次数
  15. if (nextc < 0) // 判断重入次数不能超过int范围(不能溢出)
  16. throw new Error("Maximum lock count exceeded");
  17. setState(nextc); // 设置重入次数,此处未使用cas操作,因为当前线程持有锁,所以此操作线程安全
  18. return true; // 设置重入成功
  19. }
  20. return false; //既不能加锁又不能重入,无法获取到锁
  21. }

如果执行的线程可以获取到锁,tryAcquire返回true并且设置state变量加1,则AQS的acquire方法结束,lock方法结束,加锁成功。在判断自己是否需要排队时调用了hasQueuedPredecessors方法。

  1. public final boolean hasQueuedPredecessors() {
  2. // The correctness of this depends on head being initialized
  3. // before tail and on head.next being accurate if the current
  4. // thread is first in queue.
  5. Node t = tail; // Read fields in reverse initialization order
  6. Node h = head;
  7. Node s;
  8. return h != t &&
  9. ((s = h.next) == null || s.thread != Thread.currentThread());
  10. }

hasQueuedPredecessors方法取得了AQS的tail和head,当没有其他线程加锁时,tail和head的值都为null,即h != t返回false,方法返回false,表示自己不需要排队可以直接加锁。分析到这里可以发现如果执行环境为单线程或线程间交替执行,那么ReentrantLock的加锁仅仅是改变AQS中state属性的值,并不涉及操作系统用户态与内核态的切换,因此效率较高。
如果此时线程t1加锁成功并持有锁未释放,线程t2请求加锁呢?继续走上面的acquire方法,tryAcquire(int acquires)直接返回false,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先分析addWaiter方法。

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);// 将获取锁失败的线程封装成Node
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail; // 取得AQS的尾节点
  5. if (pred != null) { // 尾节点不为空,AQS已经初始化过了
  6. node.prev = pred; // 设置当前封装好节点的上一个节点为尾节点
  7. if (compareAndSetTail(pred, node)) { // CAS设置尾节点为刚刚封装好的节点
  8. pred.next = node; // CAS成功则设置前尾节点的下一个节点为新设置的尾节点
  9. return node; // 返回新节点(此时等同于尾节点)
  10. }
  11. }
  12. enq(node); // 尾节点为空(队列未初始化)或CAS设置尾节点失败了,则执行enq
  13. return node; // 返回新节点(此时等同于尾节点)
  14. }
  15. private Node enq(final Node node) {
  16. for (;;) { // 此处执行死循环
  17. Node t = tail; // 取得尾节点
  18. if (t == null) { // 尾节点为空(队列未初始化)
  19. if (compareAndSetHead(new Node())) // cas设置尾节点为 nwe Node()
  20. tail = head; // 设置头节点指向尾节点
  21. } else { // 尾节点不为空,AQS已经初始化过了
  22. node.prev = t; // CAS设置新节点加入队列并成为尾节点
  23. if (compareAndSetTail(t, node)) {
  24. t.next = node;
  25. return t;
  26. }
  27. }
  28. }
  29. }

当线程t2执行完addWaiter方法后,初始化AQS中的Node队列,并设置将t2封装成Node入队。

由于此时t2已经入队,那么应该将线程t2设置为休眠,等待线程t1释放锁后唤醒继续执行,那么源码是这么实现的吗?我们来分析一下acquireQueued方法。

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false; // 线程是否被中断,记录标志
  5. for (;;) { // 此处执行死循环
  6. final Node p = node.predecessor(); // 获取上一个节点
  7. if (p == head && tryAcquire(arg)) { // 如果上一个节点为头节点,并且自己获取锁成功
  8. setHead(node); // 将自己设置为头节点
  9. p.next = null; // 设置原头节点的next节点为空
  10. failed = false;
  11. return interrupted; // 返回中断标志
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) && // 获取锁失败,开始休眠
  14. parkAndCheckInterrupt()) // 检查中断标志
  15. interrupted = true; // 恢复用户的中断
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }
  22. // Node内部类方法,获取上一个节点
  23. final Node predecessor() throws NullPointerException {
  24. Node p = prev;
  25. if (p == null)
  26. throw new NullPointerException();
  27. else
  28. return p;
  29. }

shouldParkAfterFailedAcquire方法决定着自旋次数为2次

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus; // 取得上一节点的waitStatus
  3. if (ws == Node.SIGNAL)
  4. /*
  5. * This node has already set status asking a release
  6. * to signal it, so it can safely park.
  7. */
  8. return true;
  9. if (ws > 0) {
  10. /*
  11. * Predecessor was cancelled. Skip over predecessors and
  12. * indicate retry.
  13. */
  14. do {
  15. node.prev = pred = pred.prev;
  16. } while (pred.waitStatus > 0);
  17. pred.next = node;
  18. } else {
  19. /*
  20. * waitStatus must be 0 or PROPAGATE. Indicate that we
  21. * need a signal, but don't park yet. Caller will need to
  22. * retry to make sure it cannot acquire before parking.
  23. */
  24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  25. }
  26. return false;
  27. }

当线程t2执行到acquireQueued方法出,node的上一级节点为头节点,如果此时当前线程尝试获取锁成功,则将自己设置为头节点并退出,如果获取锁失败,则执行shouldParkAfterFailedAcquire方法将其上一节点的waitStatus使用CAS设置为-1,如果CAS失败了,由于在for死循环中,则继续尝试获取锁,失败后继续使用CAS修改上一节点waitStatus的状态,直到获取锁成功或CAS修改状态成功,如果CAS修改状态成功了,则返回for循环尝试最后一次获取锁,如果还是获取锁失败,则继续执行shouldParkAfterFailedAcquire方法,此时方法返回true,则执行parkAndCheckInterrupt方法将线程阻塞挂起。因此即上面的问题,线程被封装为Node加入队列后并不是立即阻塞挂起,而是判断自己是不是头节点的后一个节点,如果自己是头节点的后一个节点,则自旋尝试获取锁至少两次,失败后被阻塞挂起,此时的Node队列如下。

而调用parkAndCheckInterrupt后,线程被阻塞在了LockSupport.park(this);处。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);// 线程被阻塞在此处
  3. return Thread.interrupted(); // 返回并清空中断标志
  4. }

如果线程在阻塞挂起期间被用户设置了中断标志,则线程被唤醒后继续在return Thread.interrupted();处执行,检测是否设置了中断标志,如果设置了中断标志,则parkAndCheckInterrupt()方法返回true, 此时中断标志被清空。执行完parkAndCheckInterrupt方法后应回到for死循环出继续执行interrupted = true;,表明用户设置了中断,然后继续for循环获取锁,此时由于t1将锁释放了,那么t2应该获取锁成功,随后将自己设置为头节点并返回true,acquireQueued方法结束,进入AQS的acquire方法中,if判断!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)为true,执行selfInterrupt();方法恢复用户设置的中断。

如果t1未释放锁,t2还阻塞在队列中,此时线程t3调用了lock方法,t3既不是得到锁的线程也不是队列中的第二个线程,调用hasQueuedPredecessors()方法一定会返回false,那么tryAcquire()方法一定会返回false。此时调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法将t3加入队列并将其阻塞于LockSupport.park(this);处。

非公平锁的加锁过程

非公平锁的加锁调用的是NonfairSync类的lock方法

  1. final void lock() {
  2. if (compareAndSetState(0, 1)) //CAS设置state的状态
  3. setExclusiveOwnerThread(Thread.currentThread()); //设置持有锁线程为当前线程,加锁成功
  4. else // state的状态设置失败
  5. acquire(1);
  6. }

当使用CAS设置state的状态失败后调用AQS的acquire方法如上所示,但是tryAcquire方法则是由NonfairSync类自己实现的

  1. protected final boolean tryAcquire(int acquires) {
  2. return nonfairTryAcquire(acquires);
  3. }

tryAcquire方法调用了Sync类的nonfairTryAcquire方法

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (compareAndSetState(0, acquires)) {
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. else if (current == getExclusiveOwnerThread()) {
  11. int nextc = c + acquires;
  12. if (nextc < 0) // overflow
  13. throw new Error("Maximum lock count exceeded");
  14. setState(nextc);
  15. return true;
  16. }
  17. return false;
  18. }

nonfairTryAcquire方法与公平锁的tryAcquire方法类似,但是少了hasQueuedPredecessors(),也就是说非公平所并不需要判断自己是否有获取锁的资格,直接尝试获取锁,其他过程都是调用AQS内部的方法,与公平锁的加锁过程相同。

解锁的过程

ReentractLock调用unlock方法进行解锁

  1. public void unlock() {
  2. sync.release(1);
  3. }

unlock方法内部调用了AQS内的release(1)方法

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) { //尝试进行解锁,由ReentractLock内的Sync实现
  3. Node h = head; // 取得头节点
  4. if (h != null && h.waitStatus != 0) // 头节点不为空,且队列中存在线程排队等待唤醒
  5. unparkSuccessor(h); //唤醒阻塞的线程,传入节点为头节点
  6. return true;
  7. }
  8. return false;
  9. }

Sync实现了tryRelease方法

  1. protected final boolean tryRelease(int releases) {
  2. int c = getState() - releases; // 取得释放锁后的state的值
  3. if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程不是持有锁的线程,抛出异常
  4. throw new IllegalMonitorStateException();
  5. boolean free = false; // 判断锁是否可以被其他线程获取
  6. if (c == 0) { // 如果释放锁后state的值为0,则锁可以被其他线程获取
  7. free = true;
  8. setExclusiveOwnerThread(null); // 清除持有锁的线程
  9. }
  10. setState(c); // 设置state的值,此处因为线程持有锁,因此不需要cas操作
  11. return free;
  12. }

由上面的代码可知,如果线程多次使用了lock()方法(既锁的重入次数大于1),那么调用release方法后,只是将AQS中state的值减一,线程依旧持有锁。如果锁的重入次数为1,那么调用release方法后,清空了占有锁的线程,并判断AQS队列中是否存在线程排队等待唤醒,如果存在的话则调用unparkSuccessor方法唤醒线程。

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus; // 取得节点waitStatus的值
  8. if (ws < 0) // 如果waitStatus的值小于0,则使用cas将其设置为0
  9. compareAndSetWaitStatus(node, ws, 0);
  10. /*
  11. * Thread to unpark is held in successor, which is normally
  12. * just the next node. But if cancelled or apparently null,
  13. * traverse backwards from tail to find the actual
  14. * non-cancelled successor.
  15. */
  16. Node s = node.next; // 取得下一个节点
  17. if (s == null || s.waitStatus > 0) { // 如果下一个节点不存在或者对应节点的线程被取消了
  18. s = null;
  19. for (Node t = tail; t != null && t != node; t = t.prev)// 从尾节点向前找节点直到头节点
  20. if (t.waitStatus <= 0) // 找到一个需要被唤醒的节点
  21. s = t;
  22. }
  23. if (s != null) // 如果找到了需要被唤醒的线程
  24. LockSupport.unpark(s.thread); // 唤醒线程到park处继续执行
  25. }

以上面的t3阻塞在队列中为例,假如此时线程t1调用unlock方法释放了锁,由于线程t1只调用了一次lock方法,因此调用tryRelease方法一定会返回true,AQS队列的头节点不为空,且队列中存在线程t2,t3排队等待唤醒,因此执行unparkSuccessor(h);方法并传入头节点。取得头节点的下一节点为封装线程t2的节点,该节点不为空且waitStatus=-1,因此直接执行LockSupport.unpark(s.thread);方法唤醒包裹在node节点中的线程t2。t2被唤醒后回到LockSupport.park(this);处继续执行代码,返回线程的中断标志并将其清除,返回到acquireQueued方法的for循环中继续执行

  1. if (shouldParkAfterFailedAcquire(p, node) &&
  2. parkAndCheckInterrupt())

判断线程在休眠期间是否被外部设置了中断,如果设置了中断则设置变量interrupted为true,继续for循环取得该节点的上一节点为头节点,设置当前节点为头节点,并断开原头结点的引用链接,随后返回interrupted,随后恢复acquire(int arg)方法的执行,如果设置了中断则调用selfInterrupt();方法回复中断后返回,否则直接返回,随后lock方法结束。

条件队列

Node中还存在一个属性nextWaiter,也是使用Node进行定义的

  1. /**
  2. * Link to next node waiting on condition, or the special
  3. * value SHARED. Because condition queues are accessed only
  4. * when holding in exclusive mode, we just need a simple
  5. * linked queue to hold nodes while they are waiting on
  6. * conditions. They are then transferred to the queue to
  7. * re-acquire. And because conditions can only be exclusive,
  8. * we save a field by using special value to indicate shared
  9. * mode.
  10. */
  11. Node nextWaiter;

对比于synchronized关键字中的waitnotify,使用AQS的awaitsignal可以实现与之类似的操作,区别在于wait方法只能对一个共享变量进行同步,而AQS可以通过使用一把锁创造多个条件变量(即对多个对象同步)。
在ReentrantLock中有一个newCondition方法可以创建一个条件变量

  1. final ConditionObject newCondition() {
  2. return new ConditionObject();
  3. }

而ConditionObject是AQS的内部类,即AQS的内部结构如下所示

在ConditionObject中定义了两个Node类型的属性firstWaiter,lastWaiter,其中firstWaiter用于记录调用了ConditionObject类实例的await方法后阻塞在该条件变量的阻塞队列的第一个线程对应的Node,lastWaiter记录最后一个阻塞线程对应的Node。与上面Node节点的nextWaiter属性呼应(即条件变量的阻塞队列是一个由Node节点形成的单链表)。
调用ConditionObject类实例的await方法可以使当前线程被封装为Node对象,放入该ConditionObject类实例的阻塞队列中。

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted()) //如果当前线程被设置了中断标志,抛出中断异常
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter(); // 设置当前线程封为Node并加入队列
  5. int savedState = fullyRelease(node);// 释放当前线程获取到的锁
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) { // 调用park方法将线程阻塞挂起,使用while循环是为了防止虚假唤醒
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }

调用signal方法会将阻塞在条件队列中的node移动至AQS队列中

  1. public final void signal() {
  2. if (!isHeldExclusively()) // 判断线程是否获取到了锁
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. doSignal(first); // 将第一个阻塞队列的Node节点移动至AQS队列
  7. }

相关文章