并发编程系列之AQS实现原理

x33g5p2x  于2022-01-13 转载在 其他  
字(5.2k)|赞(0)|评价(0)|浏览(489)

并发编程系列之AQS实现原理

1、什么是AQS?

AQS(AbstractQueuedSynchronizer),抽象队列同步器,是juc中很多Lock锁和同步组件的基础,比如CountDownLatch、ReentrantLock、ReentrantReadWriteLock、Semaphore等等,提供了对资源的占用、释放,线程的等待、唤醒等等接口或具体实现,可以用在各种需要控制资源竞争的场景中。

2、理清AQS整体架构

看一下juc包里的AbstractQueuedSynchronizer的源码:

  1. public abstract class AbstractQueuedSynchronizer
  2. extends AbstractOwnableSynchronizer
  3. implements java.io.Serializable {
  4. //头结点
  5. private transient volatile Node head;
  6. //尾节点
  7. private transient volatile Node tail;
  8. //volatile修饰的信号量
  9. private volatile int state;
  10. // 链表的Node节点
  11. static final class Node {
  12. volatile Node prev;
  13. volatile Node next;
  14. volatile Thread thread;
  15. }
  16. // ...
  17. }
  18. // AbstractQueuedSynchronizer父类
  19. public abstract class AbstractOwnableSynchronizer
  20. implements java.io.Serializable {
  21. private transient Thread exclusiveOwnerThread;
  22. // ...
  23. }

AQS主要由如下组件构成,最主要的还是信号量stata和CHL队列

2.1、state信号量

源码里,state是由volatile关键字修饰的,在前面的学习中,可以知道并发编程的三大特性是:原子性、有序性、可见性。而volatile修饰的变量,可以保证有序性和可见性。

volatile保证可见性

  • 写state值,volatile特性会将数据同步到主内存
  • 读state值,volatile特性会重新从主内存加载数据到工作内存

这个涉及到JMM,不熟悉的读者可以翻一下我之前博客

cpu、jvm、内存都会在代码执行过程进行指令重排序,重排序过程,可能出现一些程序异常,所以,要保证有序性,可以加上volatile关键字,volatile避免重排序依赖于操作系统的内存屏障

前面说明了加volatile的原因,然后还有一个特性,原子性,volatile只能保证单个变量的原子性,并不能保证一系列操作的原子性,所以不是线程安全的,然后AQS里是怎么保证线程安全的?然后AQS源码里怎么实现的?翻下源码,找到两个地方:

这里就是很常见的CAS锁,利用了unsafe的api,利用cpu操作的原子性保证这一操作的原子性

  1. protected final boolean compareAndSetState(int expect, int update) {
  2. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  3. }

这个方法,直接将一个newStata设置,因为修饰的是一个变量,对基本类型的变量进行直接赋值时,是可以保证原子性的

  1. protected final void setState(int newState) {
  2. state = newState;
  3. }
2.2、FIFO队列

AQS 作者 Doug Lea 给出的一份AQS文档:http://gee.cs.oswego.edu/dl/papers/aqs.pdf,里面对AQS做了比较详细的描述,英文水平比较好的读者可以看看

CHL队列是AQS的一个核心,FIFO 队列,即先进先出队列,这个队列的主要作用是存储等待的线程,假设很多线程去抢锁,大部分线程是抢不到锁的,所以这些线程需要有个队列来存储,这个队列就是CHL队列,同样遵循FIFO规则。

CHL队列内部结构是一个双向链表的数据结构,基本组件是Node节点,队列中,分别用 head 和 tail 来表示头节点和尾节点,两者在初始化的时候都指向了一个空节点,head节点的线程就可以持有锁,然后后面的线程排队等待,如图所示

2.3、exclusiveOwnerThread

AbstractQueuedSynchronizer类extends AbstractOwnableSynchronizer类,AbstractOwnableSynchronizerexclusiveOwnerThread这个定义的线程对象,表示独占模式下同步器的持有者。

3、AQS实现原理

前面已经对AQS的整体设计有了一个比较清晰的了解,主要关注点是信号量state和CHL队列,然后可以跟下源码实现,@see java.util.concurrent.locks.AbstractOwnableSynchronizer,看源码不需要一点点看,看下主要的方法和类即可,理清框架的实现原理就可以,AQS主要关注*acquire**release*这些方法就可以,分为两种类型方法,一种是独占式,另外一种是共享式,共享式方法都有加上一个shared后缀

  • acquireacquireShared:定义了资源争用的逻辑,如果没拿到,就等待
  • tryAcquiretryAcquireShared:实际执行占用资源的操作,由具体的AQS使用者实现
  • releasereleasedShared:定义了资源释放的逻辑,资源释放之后,调整队列后续节点继续争抢
  • tryReleasetryReleaseShared:实际执行资源释放的操作,具体的由AQS使用者实现
3.1、独占式

下面跟下AQS源码,注意对比独占式和共享式的实现方式有什么区别

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

acquire方法总体思路:

  • tryAcquire()方法尝试去获取资源,如果成功直接返回
  • 如果获取失败,通过addWaiter方法将当前线程包装成Node插入到CLH队列尾部,独占的标记为Node.EXCLUSIVE
  • acquireQueued方法将线程阻塞在等待队列,直到获取到资源。整个等待过程,线程被中断过,返回true,否则返回false。获取到资源后而且被中断过,就会调用selfInterrupt将中断补上

tryAcquire方法由具体使用者类实现,不实现是会抛出异常的,所以必须实现

  1. protected boolean tryAcquire(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

addWaiter方法作用,将当前线程包装成Node插入到CLH队列尾部

  1. private Node addWaiter(Node mode) {
  2. // 将当前线程封装成Node节点
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. Node pred = tail;
  6. // 尾节点不能空,队列已经初始化过
  7. if (pred != null) {
  8. // Node节点的前prev指针指向尾节点
  9. node.prev = pred;
  10. // cas锁操作,保证线程安全
  11. if (compareAndSetTail(pred, node)) {
  12. // 尾节点指针指向Node节点
  13. pred.next = node;
  14. return node;
  15. }
  16. }
  17. // 队列没初始化过,调用enq方法
  18. enq(node);
  19. return node;
  20. }

跟一下enq方法

  1. private Node enq(final Node node) {
  2. // 无限循环,类似于while(true),自旋操作
  3. for (;;) {
  4. // 尾节点
  5. Node t = tail;
  6. if (t == null) { // Must initialize
  7. // 创建new一个节点,设置为头节点
  8. if (compareAndSetHead(new Node()))
  9. // 头结点赋值给尾结点
  10. tail = head;
  11. } else {
  12. // 如果尾节点不为空,将Node节点的前驱指向尾节点
  13. node.prev = t;
  14. // cas,将node节点设置为尾节点
  15. if (compareAndSetTail(t, node)) {
  16. // 尾节点的next指针指向Node节点
  17. t.next = node;
  18. return t;
  19. }
  20. }
  21. }
  22. }

再往上看一下java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法的acquireQueued方法,前面说到acquireQueued方法作用是,线程排队等待,直到获取到资源

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. // 自旋,无限循环
  6. for (;;) {
  7. // 寻找node节点前驱节点
  8. final Node p = node.predecessor();
  9. // 前驱节点是head节点,tryAcquire
  10. if (p == head && tryAcquire(arg)) {
  11. // 获取资源成功,将node节点设置为head新的节点
  12. setHead(node);
  13. // 原来的head节点设置为null,等待GC垃圾回收
  14. p.next = null; // help GC
  15. failed = false;
  16. return interrupted;
  17. }
  18. // p不是头结点,或者tryAcquire()获取资源失败
  19. // shouldParkAfterFailedAcquire判断是否可以被park
  20. // parkAndCheckInterrupt判断park和Interrupt是否成功
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. parkAndCheckInterrupt())
  23. interrupted = true;
  24. }
  25. } finally {
  26. if (failed)
  27. cancelAcquire(node);
  28. }
  29. }

3.2、共享式

共享式即共享资源可以被多个线程同时占有

先看顶层的acquireShared方法:

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

tryAcquireShared方法是由具体使用者类去实现的,所以先看看doAcquireShared源码:

  1. private void doAcquireShared(int arg) {
  2. // 封装为Node,标记为Node.SHARED
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. boolean interrupted = false;
  7. // 无限循环,自旋
  8. for (;;) {
  9. // 查找前驱节点
  10. final Node p = node.predecessor();
  11. // 是否为head节点
  12. if (p == head) {
  13. // 尝试抢资源
  14. int r = tryAcquireShared(arg);
  15. if (r >= 0) { // 抢资源成功
  16. // 设置为头节点
  17. setHeadAndPropagate(node, r);
  18. p.next = null; // help GC
  19. if (interrupted)
  20. selfInterrupt();
  21. failed = false;
  22. return;
  23. }
  24. }
  25. // p不是头结点,或者tryAcquire()获取资源失败
  26. // shouldParkAfterFailedAcquire判断是否可以被park
  27. // parkAndCheckInterrupt判断park和Interrupt是否成功
  28. if (shouldParkAfterFailedAcquire(p, node) &&
  29. parkAndCheckInterrupt())
  30. interrupted = true;
  31. }
  32. } finally {
  33. if (failed)
  34. cancelAcquire(node);
  35. }
  36. }

这个逻辑和独占式基本差不多,不同的地方在于入队的Node是标志为SHARED共享式的,tryAcquireShared是由具体的类去实现

参考资料

相关文章