并发编程系列之AQS实现原理

x33g5p2x  于2022-01-13 转载在 其他  
字(5.2k)|赞(0)|评价(0)|浏览(427)

并发编程系列之AQS实现原理

1、什么是AQS?

AQS(AbstractQueuedSynchronizer),抽象队列同步器,是juc中很多Lock锁和同步组件的基础,比如CountDownLatch、ReentrantLock、ReentrantReadWriteLock、Semaphore等等,提供了对资源的占用、释放,线程的等待、唤醒等等接口或具体实现,可以用在各种需要控制资源竞争的场景中。

2、理清AQS整体架构

看一下juc包里的AbstractQueuedSynchronizer的源码:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//头结点
    private transient volatile Node head;
    //尾节点
    private transient volatile Node tail;
    //volatile修饰的信号量
    private volatile int state;
    
    // 链表的Node节点
 static final class Node {
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    }
	// ...
}

// AbstractQueuedSynchronizer父类
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
 private transient Thread exclusiveOwnerThread;
	// ...
}

AQS主要由如下组件构成,最主要的还是信号量stata和CHL队列

2.1、state信号量

源码里,state是由volatile关键字修饰的,在前面的学习中,可以知道并发编程的三大特性是:原子性、有序性、可见性。而volatile修饰的变量,可以保证有序性和可见性。

volatile保证可见性

  • 写state值,volatile特性会将数据同步到主内存
  • 读state值,volatile特性会重新从主内存加载数据到工作内存

这个涉及到JMM,不熟悉的读者可以翻一下我之前博客

cpu、jvm、内存都会在代码执行过程进行指令重排序,重排序过程,可能出现一些程序异常,所以,要保证有序性,可以加上volatile关键字,volatile避免重排序依赖于操作系统的内存屏障

前面说明了加volatile的原因,然后还有一个特性,原子性,volatile只能保证单个变量的原子性,并不能保证一系列操作的原子性,所以不是线程安全的,然后AQS里是怎么保证线程安全的?然后AQS源码里怎么实现的?翻下源码,找到两个地方:

这里就是很常见的CAS锁,利用了unsafe的api,利用cpu操作的原子性保证这一操作的原子性

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

这个方法,直接将一个newStata设置,因为修饰的是一个变量,对基本类型的变量进行直接赋值时,是可以保证原子性的

protected final void setState(int newState) {
    state = newState;
}
2.2、FIFO队列

AQS 作者 Doug Lea 给出的一份AQS文档:http://gee.cs.oswego.edu/dl/papers/aqs.pdf,里面对AQS做了比较详细的描述,英文水平比较好的读者可以看看

CHL队列是AQS的一个核心,FIFO 队列,即先进先出队列,这个队列的主要作用是存储等待的线程,假设很多线程去抢锁,大部分线程是抢不到锁的,所以这些线程需要有个队列来存储,这个队列就是CHL队列,同样遵循FIFO规则。

CHL队列内部结构是一个双向链表的数据结构,基本组件是Node节点,队列中,分别用 head 和 tail 来表示头节点和尾节点,两者在初始化的时候都指向了一个空节点,head节点的线程就可以持有锁,然后后面的线程排队等待,如图所示

2.3、exclusiveOwnerThread

AbstractQueuedSynchronizer类extends AbstractOwnableSynchronizer类,AbstractOwnableSynchronizerexclusiveOwnerThread这个定义的线程对象,表示独占模式下同步器的持有者。

3、AQS实现原理

前面已经对AQS的整体设计有了一个比较清晰的了解,主要关注点是信号量state和CHL队列,然后可以跟下源码实现,@see java.util.concurrent.locks.AbstractOwnableSynchronizer,看源码不需要一点点看,看下主要的方法和类即可,理清框架的实现原理就可以,AQS主要关注*acquire**release*这些方法就可以,分为两种类型方法,一种是独占式,另外一种是共享式,共享式方法都有加上一个shared后缀

  • acquireacquireShared:定义了资源争用的逻辑,如果没拿到,就等待
  • tryAcquiretryAcquireShared:实际执行占用资源的操作,由具体的AQS使用者实现
  • releasereleasedShared:定义了资源释放的逻辑,资源释放之后,调整队列后续节点继续争抢
  • tryReleasetryReleaseShared:实际执行资源释放的操作,具体的由AQS使用者实现
3.1、独占式

下面跟下AQS源码,注意对比独占式和共享式的实现方式有什么区别

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire方法总体思路:

  • tryAcquire()方法尝试去获取资源,如果成功直接返回
  • 如果获取失败,通过addWaiter方法将当前线程包装成Node插入到CLH队列尾部,独占的标记为Node.EXCLUSIVE
  • acquireQueued方法将线程阻塞在等待队列,直到获取到资源。整个等待过程,线程被中断过,返回true,否则返回false。获取到资源后而且被中断过,就会调用selfInterrupt将中断补上

tryAcquire方法由具体使用者类实现,不实现是会抛出异常的,所以必须实现

protected boolean tryAcquire(int arg) {
   throw new UnsupportedOperationException();
}

addWaiter方法作用,将当前线程包装成Node插入到CLH队列尾部

private Node addWaiter(Node mode) {
	// 将当前线程封装成Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 尾节点不能空,队列已经初始化过
    if (pred != null) {
        // Node节点的前prev指针指向尾节点
        node.prev = pred;
        // cas锁操作,保证线程安全
        if (compareAndSetTail(pred, node)) {
        	// 尾节点指针指向Node节点
            pred.next = node;
            return node;
        }
    }
    // 队列没初始化过,调用enq方法
    enq(node);
    return node;
}

跟一下enq方法

private Node enq(final Node node) {
    // 无限循环,类似于while(true),自旋操作
    for (;;) {
         // 尾节点
        Node t = tail;
        if (t == null) { // Must initialize
        	// 创建new一个节点,设置为头节点
            if (compareAndSetHead(new Node()))
            	// 头结点赋值给尾结点
                tail = head;
        } else {
       		// 如果尾节点不为空,将Node节点的前驱指向尾节点
            node.prev = t;
            // cas,将node节点设置为尾节点
            if (compareAndSetTail(t, node)) {
            	// 尾节点的next指针指向Node节点
                t.next = node;
                return t;
            }
        }
    }
}

再往上看一下java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法的acquireQueued方法,前面说到acquireQueued方法作用是,线程排队等待,直到获取到资源

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋,无限循环
        for (;;) {
             // 寻找node节点前驱节点
            final Node p = node.predecessor();
            // 前驱节点是head节点,tryAcquire
            if (p == head && tryAcquire(arg)) {
            	// 获取资源成功,将node节点设置为head新的节点
                setHead(node);
                // 原来的head节点设置为null,等待GC垃圾回收
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // p不是头结点,或者tryAcquire()获取资源失败
            // shouldParkAfterFailedAcquire判断是否可以被park
            // parkAndCheckInterrupt判断park和Interrupt是否成功
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.2、共享式

共享式即共享资源可以被多个线程同时占有

先看顶层的acquireShared方法:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared方法是由具体使用者类去实现的,所以先看看doAcquireShared源码:

private void doAcquireShared(int arg) {
    // 封装为Node,标记为Node.SHARED
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 无限循环,自旋
        for (;;) {
            // 查找前驱节点
            final Node p = node.predecessor();
            // 是否为head节点
            if (p == head) {
            	// 尝试抢资源
                int r = tryAcquireShared(arg);
                if (r >= 0) { // 抢资源成功
                    // 设置为头节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
             // p不是头结点,或者tryAcquire()获取资源失败
            // shouldParkAfterFailedAcquire判断是否可以被park
            // parkAndCheckInterrupt判断park和Interrupt是否成功
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这个逻辑和独占式基本差不多,不同的地方在于入队的Node是标志为SHARED共享式的,tryAcquireShared是由具体的类去实现

参考资料

相关文章