从ReentrantLock到AQS源码阅读

x33g5p2x  于2022-02-07 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(436)

ReentrantLock是使用AQS实现的一种可重入的独占锁,并且提供了公平和非公平两种策略。

ReentractLock的内部类Sync继承了AbstractQueuedSynchronizer抽象类,并且提供了两个内部类FairSync和NonfairSync(即公平锁与非公平锁的落地实现),而具体采用哪一种加锁策略则则是由ReentractLock的构造函数进行指定。

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

从上面的构造函数可以看到ReentrantLock默认使用的是非公平锁,传入true启用公平锁,传入false则是非公平锁

public void lock() {
    sync.lock();
}

从加锁的过程来看,lock方法调用了sync的lock方法,而Sync为抽象类,没有实现lock方法

// Sync类的lock方法
abstract void lock();

公平锁的加锁过程

那么ReentrantLock的lock方法一定调用的是其子类NonfairSync或FairSync类的lock方法(也就是说加锁的过程与ReentractLock采用的策略有关系),首先分析FairSync(公平锁)的加锁过程。

// FairSync(公平锁)类的lock方法
final void lock() {
    acquire(1);
}
// AQS的acquire方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

从上面的源码可以看出FairSync类的lock方法调用了AQS的acquire方法,而acquire方法调用了tryAcquire方法的落地实现依旧在FairSync类中,在分析tryAcquire方法前先分析一下AQS的内部属性

AQS内部定义了一个Node类,Node中有两个Node类的属性prev和next,因此Node类应该是一个双向链表,AQS中定义了Node类的head和tail,那么AQS的内部维护了一个Node类的双向链表,Node类中还有一个nextWaiter属性(区别于prev和next,nextWaiter只有单向的引用),分析应该是一个单链表,Node类中的thread属性表示Node封装的线程,waitStatus表示Node的状态。

在AQS中有一个int类型的state属性,根据锁的实现不同有不同的含义,在ReentractLock中state表示锁的可重入次数,在ReentractReadWriteLock中state的高16位表示读锁的获取次数,低16位则表示写锁的可重入次数。如果自己使用AQS实现一把锁,那么也可以自定义state的表示含义。AQS继承了AbstractOwnableSynchronizer的exclusiveOwnerThread属性表示持有锁的线程,用于可重入的判断。
分析完AQS的属性后,再看FairSync类的tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取锁的状态
    int c = getState();
    if (c == 0) { //c == 0表示锁未被其他线程获取,可以加锁
        if (!hasQueuedPredecessors() && // 判断自己是否需要排队
            compareAndSetState(0, acquires)) {  // CAS修改锁的状态--加锁
            setExclusiveOwnerThread(current);   // 设置持有锁的线程为当前线程
            return true; // 加锁成功
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 当前线程持有锁,可重入
         int nextc = c + acquires; // 设置重入次数
         if (nextc < 0) // 判断重入次数不能超过int范围(不能溢出)
             throw new Error("Maximum lock count exceeded");
             setState(nextc); // 设置重入次数,此处未使用cas操作,因为当前线程持有锁,所以此操作线程安全
             return true; // 设置重入成功
    }
    return false; //既不能加锁又不能重入,无法获取到锁
}

如果执行的线程可以获取到锁,tryAcquire返回true并且设置state变量加1,则AQS的acquire方法结束,lock方法结束,加锁成功。在判断自己是否需要排队时调用了hasQueuedPredecessors方法。

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

hasQueuedPredecessors方法取得了AQS的tail和head,当没有其他线程加锁时,tail和head的值都为null,即h != t返回false,方法返回false,表示自己不需要排队可以直接加锁。分析到这里可以发现如果执行环境为单线程或线程间交替执行,那么ReentrantLock的加锁仅仅是改变AQS中state属性的值,并不涉及操作系统用户态与内核态的切换,因此效率较高。
如果此时线程t1加锁成功并持有锁未释放,线程t2请求加锁呢?继续走上面的acquire方法,tryAcquire(int acquires)直接返回false,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先分析addWaiter方法。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);// 将获取锁失败的线程封装成Node
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail; // 取得AQS的尾节点
    if (pred != null) { // 尾节点不为空,AQS已经初始化过了
        node.prev = pred; // 设置当前封装好节点的上一个节点为尾节点
        if (compareAndSetTail(pred, node)) { // CAS设置尾节点为刚刚封装好的节点
            pred.next = node; // CAS成功则设置前尾节点的下一个节点为新设置的尾节点
            return node; // 返回新节点(此时等同于尾节点)
        }
    }
    enq(node); // 尾节点为空(队列未初始化)或CAS设置尾节点失败了,则执行enq
    return node; // 返回新节点(此时等同于尾节点)
}

private Node enq(final Node node) {
    for (;;) { // 此处执行死循环
        Node t = tail; // 取得尾节点
        if (t == null) { // 尾节点为空(队列未初始化)
            if (compareAndSetHead(new Node())) // cas设置尾节点为 nwe Node()
                tail = head; // 设置头节点指向尾节点
        } else { // 尾节点不为空,AQS已经初始化过了
            node.prev = t; // CAS设置新节点加入队列并成为尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

当线程t2执行完addWaiter方法后,初始化AQS中的Node队列,并设置将t2封装成Node入队。

由于此时t2已经入队,那么应该将线程t2设置为休眠,等待线程t1释放锁后唤醒继续执行,那么源码是这么实现的吗?我们来分析一下acquireQueued方法。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; // 线程是否被中断,记录标志
        for (;;) { // 此处执行死循环
            final Node p = node.predecessor(); // 获取上一个节点
            if (p == head && tryAcquire(arg)) { // 如果上一个节点为头节点,并且自己获取锁成功
                setHead(node);  // 将自己设置为头节点
                p.next = null; // 设置原头节点的next节点为空
                failed = false;
                return interrupted; // 返回中断标志
            }
            if (shouldParkAfterFailedAcquire(p, node) &&  // 获取锁失败,开始休眠
                parkAndCheckInterrupt()) // 检查中断标志
                interrupted = true;  // 恢复用户的中断
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// Node内部类方法,获取上一个节点
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

shouldParkAfterFailedAcquire方法决定着自旋次数为2次

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 取得上一节点的waitStatus
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

当线程t2执行到acquireQueued方法出,node的上一级节点为头节点,如果此时当前线程尝试获取锁成功,则将自己设置为头节点并退出,如果获取锁失败,则执行shouldParkAfterFailedAcquire方法将其上一节点的waitStatus使用CAS设置为-1,如果CAS失败了,由于在for死循环中,则继续尝试获取锁,失败后继续使用CAS修改上一节点waitStatus的状态,直到获取锁成功或CAS修改状态成功,如果CAS修改状态成功了,则返回for循环尝试最后一次获取锁,如果还是获取锁失败,则继续执行shouldParkAfterFailedAcquire方法,此时方法返回true,则执行parkAndCheckInterrupt方法将线程阻塞挂起。因此即上面的问题,线程被封装为Node加入队列后并不是立即阻塞挂起,而是判断自己是不是头节点的后一个节点,如果自己是头节点的后一个节点,则自旋尝试获取锁至少两次,失败后被阻塞挂起,此时的Node队列如下。

而调用parkAndCheckInterrupt后,线程被阻塞在了LockSupport.park(this);处。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);// 线程被阻塞在此处
    return Thread.interrupted(); // 返回并清空中断标志
}

如果线程在阻塞挂起期间被用户设置了中断标志,则线程被唤醒后继续在return Thread.interrupted();处执行,检测是否设置了中断标志,如果设置了中断标志,则parkAndCheckInterrupt()方法返回true, 此时中断标志被清空。执行完parkAndCheckInterrupt方法后应回到for死循环出继续执行interrupted = true;,表明用户设置了中断,然后继续for循环获取锁,此时由于t1将锁释放了,那么t2应该获取锁成功,随后将自己设置为头节点并返回true,acquireQueued方法结束,进入AQS的acquire方法中,if判断!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)为true,执行selfInterrupt();方法恢复用户设置的中断。

如果t1未释放锁,t2还阻塞在队列中,此时线程t3调用了lock方法,t3既不是得到锁的线程也不是队列中的第二个线程,调用hasQueuedPredecessors()方法一定会返回false,那么tryAcquire()方法一定会返回false。此时调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法将t3加入队列并将其阻塞于LockSupport.park(this);处。

非公平锁的加锁过程

非公平锁的加锁调用的是NonfairSync类的lock方法

final void lock() {
    if (compareAndSetState(0, 1)) //CAS设置state的状态
        setExclusiveOwnerThread(Thread.currentThread()); //设置持有锁线程为当前线程,加锁成功
    else // state的状态设置失败
        acquire(1);
}

当使用CAS设置state的状态失败后调用AQS的acquire方法如上所示,但是tryAcquire方法则是由NonfairSync类自己实现的

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

tryAcquire方法调用了Sync类的nonfairTryAcquire方法

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

nonfairTryAcquire方法与公平锁的tryAcquire方法类似,但是少了hasQueuedPredecessors(),也就是说非公平所并不需要判断自己是否有获取锁的资格,直接尝试获取锁,其他过程都是调用AQS内部的方法,与公平锁的加锁过程相同。

解锁的过程

ReentractLock调用unlock方法进行解锁

public void unlock() {
    sync.release(1);
}

unlock方法内部调用了AQS内的release(1)方法

public final boolean release(int arg) {
    if (tryRelease(arg)) { //尝试进行解锁,由ReentractLock内的Sync实现
        Node h = head;  // 取得头节点
        if (h != null && h.waitStatus != 0) // 头节点不为空,且队列中存在线程排队等待唤醒
            unparkSuccessor(h); //唤醒阻塞的线程,传入节点为头节点
        return true;
    }
    return false;
}

Sync实现了tryRelease方法

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 取得释放锁后的state的值
    if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程不是持有锁的线程,抛出异常
        throw new IllegalMonitorStateException();
    boolean free = false; // 判断锁是否可以被其他线程获取
    if (c == 0) { // 如果释放锁后state的值为0,则锁可以被其他线程获取
        free = true;
        setExclusiveOwnerThread(null); // 清除持有锁的线程
    }
    setState(c); // 设置state的值,此处因为线程持有锁,因此不需要cas操作
    return free;
}

由上面的代码可知,如果线程多次使用了lock()方法(既锁的重入次数大于1),那么调用release方法后,只是将AQS中state的值减一,线程依旧持有锁。如果锁的重入次数为1,那么调用release方法后,清空了占有锁的线程,并判断AQS队列中是否存在线程排队等待唤醒,如果存在的话则调用unparkSuccessor方法唤醒线程。

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus; // 取得节点waitStatus的值
    if (ws < 0) // 如果waitStatus的值小于0,则使用cas将其设置为0
        compareAndSetWaitStatus(node, ws, 0);

    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node.  But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    Node s = node.next; // 取得下一个节点
    if (s == null || s.waitStatus > 0) { // 如果下一个节点不存在或者对应节点的线程被取消了
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)// 从尾节点向前找节点直到头节点
           if (t.waitStatus <= 0) // 找到一个需要被唤醒的节点
                s = t;
    }
    if (s != null) // 如果找到了需要被唤醒的线程
        LockSupport.unpark(s.thread); // 唤醒线程到park处继续执行
}

以上面的t3阻塞在队列中为例,假如此时线程t1调用unlock方法释放了锁,由于线程t1只调用了一次lock方法,因此调用tryRelease方法一定会返回true,AQS队列的头节点不为空,且队列中存在线程t2,t3排队等待唤醒,因此执行unparkSuccessor(h);方法并传入头节点。取得头节点的下一节点为封装线程t2的节点,该节点不为空且waitStatus=-1,因此直接执行LockSupport.unpark(s.thread);方法唤醒包裹在node节点中的线程t2。t2被唤醒后回到LockSupport.park(this);处继续执行代码,返回线程的中断标志并将其清除,返回到acquireQueued方法的for循环中继续执行

if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())

判断线程在休眠期间是否被外部设置了中断,如果设置了中断则设置变量interrupted为true,继续for循环取得该节点的上一节点为头节点,设置当前节点为头节点,并断开原头结点的引用链接,随后返回interrupted,随后恢复acquire(int arg)方法的执行,如果设置了中断则调用selfInterrupt();方法回复中断后返回,否则直接返回,随后lock方法结束。

条件队列

Node中还存在一个属性nextWaiter,也是使用Node进行定义的

/**
* Link to next node waiting on condition, or the special
* value SHARED.  Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;

对比于synchronized关键字中的waitnotify,使用AQS的awaitsignal可以实现与之类似的操作,区别在于wait方法只能对一个共享变量进行同步,而AQS可以通过使用一把锁创造多个条件变量(即对多个对象同步)。
在ReentrantLock中有一个newCondition方法可以创建一个条件变量

final ConditionObject newCondition() {
    return new ConditionObject();
}

而ConditionObject是AQS的内部类,即AQS的内部结构如下所示

在ConditionObject中定义了两个Node类型的属性firstWaiter,lastWaiter,其中firstWaiter用于记录调用了ConditionObject类实例的await方法后阻塞在该条件变量的阻塞队列的第一个线程对应的Node,lastWaiter记录最后一个阻塞线程对应的Node。与上面Node节点的nextWaiter属性呼应(即条件变量的阻塞队列是一个由Node节点形成的单链表)。
调用ConditionObject类实例的await方法可以使当前线程被封装为Node对象,放入该ConditionObject类实例的阻塞队列中。

public final void await() throws InterruptedException {
    if (Thread.interrupted()) //如果当前线程被设置了中断标志,抛出中断异常
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 设置当前线程封为Node并加入队列
    int savedState = fullyRelease(node);// 释放当前线程获取到的锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 调用park方法将线程阻塞挂起,使用while循环是为了防止虚假唤醒
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

调用signal方法会将阻塞在条件队列中的node移动至AQS队列中

public final void signal() {
    if (!isHeldExclusively()) // 判断线程是否获取到了锁
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); // 将第一个阻塞队列的Node节点移动至AQS队列
}

相关文章