队列同步器AQS,用来构建锁或者其他同步组件的基础框架,它有int类型的 state变量表示同步状态,以及内置的FIFO队列来完成志愿获取线程的排队工作。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
AQS提供了3个修改状态的方法:
方法 | 作用 |
---|---|
protected boolean tryAcquire(int arg) | 这是独占式获取同步状态的方法,该方法的实现需要查询到当前的同步状态,同时做出相应判断,最后再通过CAS设置同步状态 |
protected boolean tryRelease(int arg) | 这是独占式释放同步状态的方法,让那些等待获取同步状态的线程能够有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 这是共享式获取同步状态的方法,返回的值大于等于0,说明就获取成功了,否则,就是获取失败 |
protected boolean tryReleaseShared(int arg) | 这是共享式释放同步状态的方法,让那些等待获取同步状态的线程能够有机会获取同步状态 |
protected boolean isHeldExclusively() | 这个方法用来判断当前同步器是否在独占模式下被线程占用,它会取出占用的线程和当前线程做个比较,看下是否相等 |
模版方法分为3类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况
方法 | 作用 |
---|---|
public final void acquire(int arg) | 这个方法是独占式获取同步状态的方法,该方法会调用重写的tryAcquire()方法来配合做结果判断,如果当前线程获取同步状态是成功的,该方法就会返回,如果不成功就会进入同步队列等待 |
public final void acquireInterruptibly(int arg) | 和acquire()方法类似,不过它多了个响应中断的能力,当前线程未获取到同步状态,一样会进入同步队列,但是如果当前线程被中断,那就会抛出InterruptedException异常并返回 |
public final boolean tryAcquireNanos(int arg, long nanosTimeout) | 在acquireInterruptibly()方法的基础上又加了超时的限制,这样可以让在规定时间内无法获取到同步器状态的线程直接返回false,当然了如果获取到则返回true。 |
public final void acquireShared(int arg) | 这是共享式获取同步状态的方法,该方法会调用重写的tryAcquireShared()方法来配合做结果判断,如果当前线程获取同步状态是成功的,该方法就会返回,如果不成功就会进入同步队列等待,不过这个方法支持同一时刻可以有多个线程获取同步状态。 |
public final void acquireSharedInterruptibly(int arg) | 和acquireShared()方法类似,不过它多了个响应中断的能力,当前线程未获取到同步状态,一样会进入同步队列,但是如果当前线程被中断,那就会抛出InterruptedException异常并返回 |
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 在acquireSharedInterruptibly()方法的基础上又加了超时的限制,这样可以让在规定时间内无法获取到同步器状态的线程直接返回false,当然了如果获取到则返回true。 |
public final boolean release(int arg) | 这也是独占式的方法,用来释放同步状态,然后将同步队列中的第一个节点包含的线程唤醒 |
public final boolean releaseShared(int arg) | 这也是共享式的方法,用来释放同步状态,然后将同步队列中的第一个节点包含的线程唤醒 |
public final Collection getQueuedThreads() | 获取等待在同步队列上的线程集合 |
同步队列依赖内部的同步队列来完成状态的管理,当前线程获取同步队列失败时,同步器会将当前线程以及等待状态等信息构造一个节点(Node)并放入同步队列中,同时会阻塞当前线程,当前状态释放时候,会把首节点中的线程唤醒,使其再次获取同步状态
等待状态:包含如下状态
前驱节点,当节点加入到同步队列时被设置(尾部添加)
后继节点
等待队列中的后继节点。若当前节点是共享的,那么这个字段将是一个SHARED常量,也是说节点类别(独占和共享)和等待队列中的后继节点共用同一个字段
获取同步状态的线程
A、主要过程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如代码所示:
B、addWaiter方法:节点构造并加入同步队列中,构造的节点是独占式的(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
C、acquireQueued:节点进入同步队列中自旋,每个节点都在检查,当条件满足了,获取同步状态,就可以自旋过程退出,否则留在字段过程中,并阻塞节点的线程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
D、acquire流程图如下
当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,时候去节点获取同步状态
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 找到第一个节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
主要过程:
A、调用tryAcquireShared尝试获取同步状态、如果返回值>0,则获取成功
B、否则,在自旋过程中,如果是头节点,并尝试同步状态成功,如果获取状态>0,唤醒后继节点,并退出自旋
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放同步状态,并唤醒后继节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doAcquireNanos(int arg, long nanosTimeout)可以超时获取同步状态,也就是在固定时间段内获取同步状态,如果获取成功则返回true;发否则返回fase
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
独占式获取同步状态流程图
欢迎关注公众号"程序员ZZ的源码",一起学习
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/u012998254/article/details/77922663
内容来源于网络,如有侵权,请联系作者删除!