Jdk源码分析

文章40 |   阅读 17322 |   点赞0

来源:https://yumbo.blog.csdn.net/category_10384063.html

以ReentrantLock的非公平锁为例深入解读AbstractQueuedSynchronizer源码

x33g5p2x  于2021-12-18 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(460)

以下面这段代码为例,我们分析以下ReentrantLock的工作原理,聊一聊,ReentrantLock到底做了哪些事情!

  1. public class ReentrantLockTest {
  2. static ReentrantLock lock = new ReentrantLock();
  3. public static void main(String[] args) {
  4. new Thread(()->{
  5. lock.lock();
  6. try {
  7. System.out.println("A:这是一段加锁代码段");
  8. } finally {
  9. //lock.unlock();
  10. }
  11. },"A").start();
  12. new Thread(()->{
  13. lock.lock();
  14. try {
  15. System.out.println("B:这是一段加锁代码段");
  16. } finally {
  17. lock.unlock();
  18. }
  19. },"B").start();
  20. }
  21. }

在上面的代码你是不是疑惑为什么线程A的lock.unlock()是被注释掉的。是不是我写错了?
这样不是会导致死锁?

这是我有意为之的,因为如题,今天我们是为了学习AQS而来的。

为了学习效果好,最后将上面的代码复制粘贴到IDEA / eclipse中我们调试一下这段程序。

先看下线程A它在做什么

首先在类的初始化时成员属性lock就被赋值new ReentrantLock();得到的是一个ReentrantLock实例,并且成员属性sync被赋值为非公平锁NonfairSync实例
对应源码

然后当我们让线程start()时,在执行lock.lock()方法时
调用的时ReentrantLock中的lock()

由于我们前面将sync = new NonfairSync();因此本质上调用的是NonfairSync继承父类Sync.lock(),
代码就执行到了

这里面有一个判断,判断中的方法在子类NonfairSync中有实现Sync中定义的abstract方法。
这个时候需要到Sync的子类NonfairSync中找这个方法。如下

  1. final boolean initialTryLock() {
  2. Thread current = Thread.currentThread(); // 获取当前线程
  3. if (compareAndSetState(0, 1)) { // 之前没有加锁,则将state更新为1,并进行加锁 setExclusiveOwnerThread(current);
  4. setExclusiveOwnerThread(current); // 将当前线程设置为独占
  5. return true; // 加锁成功
  6. } else if (getExclusiveOwnerThread() == current) {// 之前加过锁,相当于重入锁(也意味着当前线程本来就获得到了锁),第二次进入则进入代码块
  7. int c = getState() + 1; // 将锁计数+1,相当于多次加锁,每加一次锁就会+1
  8. if (c < 0) // 小于0,可能是超过int的上限导致变成负数抛异常
  9. throw new Error("Maximum lock count exceeded");
  10. setState(c);//更新state值
  11. return true;
  12. } else //这个分支说明当前线程所操作的资源已经被加锁了,需要等待释放锁后获得锁。所以返回false
  13. return false;
  14. }

阅读这段代码,会发现第一个判断调用的则是AQS的final方法

  1. // CAS操作对state设置新值,(如果state的值为expect则将state更新为update)
  2. protected final boolean compareAndSetState(int expect, int update) {
  3. return U.compareAndSetInt(this, STATE, expect, update);
  4. }

而这个U则是封装好的一个支持cpu指令操作的一个工具类,目的就是支持CAS操作

  1. private static final Unsafe U = Unsafe.getUnsafe(); //用的都是最底层的操作,里面有很多关于cas操作的native方法

实际上就是将state值由0,变成1
只是这种操作更安全,因为是原子性的一个操作更新值。如果更新成功则进入第一个代码块
执行 setExclusiveOwnerThread(current);将当前线程设置为独占线程,本质上就是将当前线程存起来,追踪过去又跑到了AQS父类AOS的方法原方法的源码

  1. private transient Thread exclusiveOwnerThread; //成员属性
  2. protected final void setExclusiveOwnerThread(Thread thread) {
  3. exclusiveOwnerThread = thread; //给成员属性赋值
  4. }

所以这个方法也就没啥好讲的,意思就是将当前线程设置为独占排他线程。也就是一个象征性的意义没有多大作用,真正控制线程的在后面,我们继续深入。

执行initialTryLock()方法时,根据方法内的3个分支
1.第一次调用lock
2.同一个线程重复调用了lock
3.不同线程使用同一把锁调用了lock

三个分支会导致3种情况
1、2分支则初始化锁成功,返回true
3则会返回false表示初始化锁失败

回到之前Sync中调用initialTryLock()的代码,由于是第一次加锁因此执行的是第一个分支,返回true

下面是我加上注释后的源码

  1. @ReservedStackAccess
  2. final void lock() {
  3. //initialTryLock()本质上调用的是子类重写的方法例如:NonfairSync和FairSync内的initialTryLock()
  4. // 只有已经上锁了,当前线程没有获得到锁才会进入if
  5. if (!initialTryLock()){
  6. /** * 如果是不同的线程则返回false,就执行if内的 acquire(1); * 又去执行AQS的 public final void acquire(int arg)方法, * 意思就是需要将其加入线程等待队列中(判断中initialTryLock()就已经知道这个线程没有获得到锁返回了false) */
  7. acquire(1);// AQS的线程队列是在这个方法内部形成的,需求去父类AQS查看这个final修饰方法
  8. }
  9. }

会发现!true返回false则if内的acquire(1);不会执行就返回了。
然后对应我们线程A的lock方法也就执行完了。

  1. new Thread(()->{
  2. try {
  3. lock.lock();
  4. System.out.println("A:这是一段加锁代码段");
  5. } finally {
  6. //lock.unlock();
  7. }
  8. },"A").start();

然后就会执行打印输出语句,在控制台中打印出A:这是一段加锁代码段
由于解锁语句被注释了,因此这个线程内的代码就执行完了。

接下来我们看下线程B它的过程

B和A的逻辑差不多,但线程B如何执行的?
同样会进入 lock,本质调用Sync类中定义的 lock
也会初始化锁执行initialTryLock()但是由于此时的线程是另一个线程我们粘贴一下源码

  1. final boolean initialTryLock() {
  2. Thread current = Thread.currentThread(); // 获取当前线程
  3. if (compareAndSetState(0, 1)) { // 之前没有加锁,则将state更新为1,并进行加锁 setExclusiveOwnerThread(current);
  4. setExclusiveOwnerThread(current); // 将当前线程设置为独占
  5. return true; // 加锁成功
  6. } else if (getExclusiveOwnerThread() == current) {// 之前加过锁,相当于重入锁(也意味着当前线程本来就获得到了锁),第二次进入则进入代码块
  7. int c = getState() + 1; // 将锁计数+1,相当于多次加锁,每加一次锁就会+1
  8. if (c < 0) // 小于0,可能是超过int的上限导致变成负数抛异常
  9. throw new Error("Maximum lock count exceeded");
  10. setState(c);//更新state值
  11. return true;
  12. } else //这个分支说明当前线程所操作的资源已经被加锁了,需要等待释放锁后获得锁。所以返回false
  13. return false;
  14. }

会发现此时的state state是AQS类的成员属性 之前被上一个线程更新为了1,因为使用的是同一把锁。
第一个分支不走
又因为第二个判断发现当前线程B,和独占线程A不是同一个线程因此分支2也不走

  1. getExclusiveOwnerThread() // 返回独占线程,来自AQS父类AOS方法

因此最终返回的是false

因此下面 if 中的代码需要执行

  1. @ReservedStackAccess
  2. final void lock() {
  3. if (!initialTryLock()){
  4. acquire(1);// AQS的线程队列是在这个方法内部形成的,需求去父类AQS查看这个final修饰方法
  5. }
  6. }

acquire(int);方法是来自非公平锁的父类的父类AQS内部定义的final方法

重点来了,需要集中注意!这里是分水岭

在AQS中源码是这样定义的,下面有两个acquire上面的会调用下面更多参数的acquire方法

下面的代码可以先不看,先拉到后面看我的解说,然后对照着我的解说看代码

  1. /** * 尝试获得到锁,实际调用下面多个参数的acquire()方法 */
  2. public final void acquire(int arg) {
  3. /** * tryAcquire(arg)调用的是子类实现的tryAcquire(int),本身在AQS则是一个抽象方法 * 需要在子类中查看具体实现,例如:ReentrantLock中的内部类NonfairSync、FairSync中的tryAcquire(int) * 因此需要去子类查看这个方法的实现只有失败了才会进一步调用acquire(null, 1, false, false, false, 0L); */
  4. if (!tryAcquire(arg)) {
  5. // 注意参数值除了arg的值是变量,其它都是0会false或null,一遍而言传入的是1,除非一次性加了多次锁
  6. acquire(null, arg, false, false, false, 0L);
  7. }
  8. }
  9. /** * 抢占锁的方法,加锁的时候除了arg=1其它都是null或false * * @param node * @param arg 加锁次数 * @param shared 控制是否时共享线程队列也就是SharedNode的布尔值 * @param interruptible 是否时可中断线程 * @param timed 是否由最长等待时间 * @param time 中断超时时间 */
  10. final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
  11. Thread current = Thread.currentThread(); // 获取当前线程
  12. byte spins = 0, postSpins = 0; //
  13. boolean interrupted = false, first = false; // 中断变量值interrupted,first表示第一次进入方法
  14. Node pred = null; //
  15. // 自旋获取锁
  16. for (; ; ) {
  17. // 循环的第一次判断为 true && false (node为null pred=null,null!=null。返回false就不执行后面的赋值和判断)
  18. // 自旋后的第二次由于是刚创建的则prev为null因此还是false,pred=null,还是false不执行
  19. // 当node!=null且node.prev!=null说明节点已经入队了,因此第二个判断返回true需要判断!(first = (head == pred))返回的是false
  20. if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
  21. // 进入这个代码块说明队列不为空且不止一个线程在等待
  22. if (pred.status < 0) { // 实际就是传入的node的前一个节点的status是否<0
  23. cleanQueue(); // 清空队列
  24. continue;
  25. } else if (pred.prev == null) {
  26. Thread.onSpinWait(); // 确保序列化
  27. continue;
  28. }
  29. }
  30. //循环第一次 false || true,需要进入代码块。pred在第一次判断就被赋值为null,循环进入前也是null
  31. if (first || pred == null) {
  32. boolean acquired;
  33. try {
  34. if (shared) {
  35. // 循环第一次false进入else分支
  36. acquired = (tryAcquireShared(arg) >= 0);
  37. } else {
  38. /** * 尝试抢占锁,如果没有抢到则会自旋,tryAcquire(arg);会一直重复调用,直到抢占成功 * 子类实现的方法。例如非公平锁的tryAcquired(1); * 如果state为0,则acquired则会变成true。将当前线程设置为独占并更新state为arg * 如果state不为0,则acquired=false,说明被其它线程上锁了 * * 自旋的起点是后面 * if (node == null) { * if (shared)// 如果是共享队列节点则创建SharedNode,然后由于后面没有代码则会自旋for循环重新执行一遍只是这个时候node不为null * node = new SharedNode(); //这个是线程队列的头节点,用来标识这个队列是一个共享锁线程等待队列 * else // 如果是一个排他锁创建一个排他节点 * node = new ExclusiveNode();// 线程队列的头节点,标识是一个排他锁线程队列 * } * 然后到这里的tryAcquire(arg);一直原地踏步,直到抢占到锁 * acquired更新为true * 进入catch后面的if * */
  39. acquired = tryAcquire(arg); // 会回到子类(NonfairSync、FairSync等)的实现的方法尝试获得锁,如果失败则还是false,直到成功true
  40. }
  41. } catch (Throwable ex) {
  42. cancelAcquire(node, interrupted, false);
  43. throw ex;
  44. }
  45. // true说明当前线程抢占到锁了
  46. if (acquired) {
  47. if (first) {
  48. node.prev = null;
  49. head = node;
  50. pred.next = null;
  51. node.waiter = null;
  52. if (shared)
  53. signalNextIfShared(node);
  54. if (interrupted)
  55. current.interrupt();
  56. }
  57. return 1; // 返回1标签抢占到锁了这是这个方法的唯一结束点,其它分支始终都会死循环
  58. }
  59. }
  60. /** * AQS队列的形成起点,头节点就是下面的SharedNode或ExclusiveNode,然后自旋 */
  61. // 第一次进入node传入的是null因此进入
  62. // 第二次由于第一次进入后node=new SharedNode();或node = new ExclusiveNode();则这个if就不会在进入,会进入第二个if因为pred=null
  63. if (node == null) {
  64. if (shared)// 如果是共享队列节点则创建SharedNode,然后由于后面没有代码则会自旋for循环重新执行一遍只是这个时候node不为null
  65. node = new SharedNode(); //这个是线程队列的头节点,用来标识这个队列是一个共享锁线程等待队列
  66. else // 如果是一个排他锁创建一个排他节点
  67. node = new ExclusiveNode();// 线程队列的头节点,标识是一个排他锁线程队列
  68. } else if (pred == null) { // 尝试将当前线程入队
  69. node.waiter = current; // 将当前线程存入ExclusiveNode节点的waiter成员变量
  70. Node t = tail; // 一开始tail=null
  71. node.setPrevRelaxed(t); // 将node的prev指向tail,而此时node.next还是null,相当于加入到了队列的末尾,后面需要将队列末尾指向node形成双向的队列
  72. if (t == null) // 第一次队列还没有形成因此t是null需要将头节点初始化
  73. tryInitializeHead(); // 初始化头节点,并且在内部将tail也执行了这个初始的头节点
  74. else if (!casTail(t, node)) // casTail(t, node)会将tail变成node
  75. node.setPrevRelaxed(null); // 如果tail更新失败。则node.prev=null则又会重新进入if进行更新,直到更新成功。
  76. else
  77. t.next = node; // 将队列的末尾指向node形成双向队列(node.next则是null,也就是说aqs队列的末尾元素的next始终为null,prev会指向前一个节点)
  78. } else if (first && spins != 0) { //
  79. --spins; // 让出cpu使用权,减少线程调度得不公平性
  80. Thread.onSpinWait(); // 让出cpu使用全和Thread.sleep(0)差不多的作用,但是Thread.onSpinWait();更高效,使用的是cpu指令
  81. } else if (node.status == 0) { // 0是初始化赋得值,这里aqs队列得节点自然是要要让线程等待,因此更新status值为1(WAITING得值就是常量1)
  82. node.status = WAITING; // 如果status为0更新status值为常量WAITING=1,1表示等待
  83. } else {
  84. long nanos;
  85. spins = postSpins = (byte) ((postSpins << 1) | 1);// spins!=0则会调用Thread.onSpinWait();,让当前线程让出cpu的使用权
  86. if (!timed)//如果没有设置阻塞时间
  87. LockSupport.park(this);// 阻塞当前线程
  88. else if ((nanos = time - System.nanoTime()) > 0L)//如果设置了阻塞时长且时间nanos > 0
  89. LockSupport.parkNanos(this, nanos); //阻塞nanos纳秒
  90. else // 如果时间不合法 则break
  91. break;
  92. node.clearStatus();// 将status重新更新为0
  93. if ((interrupted |= Thread.interrupted()) && interruptible)
  94. break;
  95. }
  96. }
  97. return cancelAcquire(node, interrupted, interruptible);//返回0 或者 返回CANCELLED常量负数
  98. }

会发现执行acquire(1);
内部有一个 if 判断
判断内容是,将acquires的值设置为1

  1. /** * 代码来自非公平锁内部 * 尝试获取锁(加锁)acquires次 */
  2. protected final boolean tryAcquire(int acquires) {
  3. // 如果加锁线程已经释放了锁,也就是state=0那么就将当前线程设置为独占线程并更新state值为1,表示抢占锁成功
  4. if (getState() == 0 && compareAndSetState(0, acquires)) {
  5. setExclusiveOwnerThread(Thread.currentThread());// 将当前线程设置为独占线程,表示当前线程获得到锁
  6. return true;
  7. }
  8. return false;//如果已经加过锁了由于state则不为0则会返回false,表示抢占锁失败
  9. }

这里面的逻辑和前面我们分析过的差不多,很明显会返回false,因为state!=0
因为返回false,所以导致内部代码块需要执行也就是

  1. if (!tryAcquire(1)) {
  2. acquire(null, 1, false, false, false, 0L);
  3. }

然后我们将这些参数值带入去看下多个参数的acquire方法
这个时候拉到上面我粘贴出来的代码,结合我的注释分析一遍流程。
如果我的注释中有错误的地方,可以在评论区给我指出,我会更新文章改正过来。
关于AQS的源码我也有点疑惑,那就是关于这个方法的spins、postSpins这个参数的具体作用是干什么的。从子面量的意思来说分别表示spins和过期的spins,分别用这个两个变量存。

经过这个方法,会形成如下的队列
线程A是持有锁的线程因此不在aqs队列中,而B则因为没有获得锁,就进入了等待队列。
B指我上面的线程B产生的节点,在B之前会创建一个节点,然后利用这个节点作为头,真正的B却是head.next得到的aqs队列实际意义上的队列头。
这么做的目的就是为了方便利用AQS的成员属性快速找到头。相当于始终有一个head指针指向了aqs队列的头,始终有一个tail指针指向队列末尾节点。

通过源码我们很容易看出aqs利用自旋的方式创建了一个CLH队列,然后利用LockSupport.park()将线程进行阻塞

LockSupport源码

而释放锁在我们熟悉了AQS的数据结构后,以及前面的基础后很容易定位到源码
源码依此是
ReentrantLock类

  1. public void unlock() {
  2. sync.release(1);
  3. }

AQS类

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. signalNext(head);//唤醒head.next所指向的节点线程
  4. return true;
  5. }
  6. return false;
  7. }

Sync类

  1. // 尝试释放锁
  2. @ReservedStackAccess
  3. protected final boolean tryRelease(int releases) {
  4. int c = getState() - releases;//计算释放后的state值
  5. if (getExclusiveOwnerThread() != Thread.currentThread())// 如果当前线程不是和独占线程同一个线程抛异常IllegalMonitorStateException
  6. throw new IllegalMonitorStateException();
  7. boolean free = (c == 0);// 计算是否要清除独占先(计算释放后是否还有线程持有锁)
  8. if (free)
  9. setExclusiveOwnerThread(null);//清除独占线程
  10. setState(c);//更新state值
  11. return free;
  12. }
  1. /** * 唤醒线程队列节点中的线程 * 传入的是head,通过head.next得到等待线程的第一个节点将其唤醒 */
  2. private static void signalNext(Node h) {
  3. Node s;
  4. if (h != null && (s = h.next) != null && s.status != 0) {
  5. s.getAndUnsetStatus(WAITING); // 更新线程状态值为常量1
  6. LockSupport.unpark(s.waiter); // 给当前节点的线程发放一个许可,唤醒该线程
  7. }
  8. }

执行完signalNext后就会将队列第一个节点中的线程唤醒,仔细观察if中的判断,会发现这个方法中的h应该传入的是head的意思,也就是说将真正意义的线程队列第一个节点(实际通过head.next得到)然后通过LockSupport唤醒该节点中的线程

相关文章