ReentrantLock 实现原理

x33g5p2x  于2020-07-27 发布在 Java  
字(4.5k)|赞(0)|评价(0)|浏览(1330)

使用 synchronized 来做同步处理时,锁的获取和释放都是隐式的,实现的原理是通过编译后加上不同的机器指令来实现。

ReentrantLock 就是一个普通的类,它是基于 AQS(AbstractQueuedSynchronizer)来实现的。

是一个重入锁:一个线程获得了锁之后仍然可以反复的加锁,不会出现自己阻塞自己的情况。

AQSJava 并发包里实现锁、同步的一个重要的基础框架。

锁类型

ReentrantLock 分为公平锁非公平锁,可以通过构造方法来指定具体类型:

  1. //默认非公平锁
  2. public ReentrantLock() {
  3. sync = new NonfairSync();
  4. }
  5. //公平锁
  6. public ReentrantLock(boolean fair) {
  7. sync = fair ? new FairSync() : new NonfairSync();
  8. }

默认一般使用非公平锁,它的效率和吞吐量都比公平锁高的多(后面会分析具体原因)。

获取锁

通常的使用方式如下:

  1. private ReentrantLock lock = new ReentrantLock();
  2. public void run() {
  3. lock.lock();
  4. try {
  5. //do bussiness
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } finally {
  9. lock.unlock();
  10. }
  11. }

公平锁获取锁

首先看下获取锁的过程:

  1. public void lock() {
  2. sync.lock();
  3. }

可以看到是使用 sync的方法,而这个方法是一个抽象方法,具体是由其子类(FairSync)来实现的,以下是公平锁的实现:

  1. final void lock() {
  2. acquire(1);
  3. }
  4. //AbstractQueuedSynchronizer 中的 acquire()
  5. public final void acquire(int arg) {
  6. if (!tryAcquire(arg) &&
  7. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  8. selfInterrupt();
  9. }

第一步是尝试获取锁(tryAcquire(arg)),这个也是由其子类实现:

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (!hasQueuedPredecessors() &&
  6. compareAndSetState(0, acquires)) {
  7. setExclusiveOwnerThread(current);
  8. return true;
  9. }
  10. }
  11. else if (current == getExclusiveOwnerThread()) {
  12. int nextc = c + acquires;
  13. if (nextc < 0)
  14. throw new Error("Maximum lock count exceeded");
  15. setState(nextc);
  16. return true;
  17. }
  18. return false;
  19. }
  20. }

首先会判断 AQS 中的 state 是否等于 0,0 表示目前没有其他线程获得锁,当前线程就可以尝试获取锁。

注意:尝试之前会利用 hasQueuedPredecessors() 方法来判断 AQS 的队列中中是否有其他线程,如果有则不会尝试获取锁(这是公平锁特有的情况)。

如果队列中没有线程就利用 CAS 来将 AQS 中的 state 修改为1,也就是获取锁,获取成功则将当前线程置为获得锁的独占线程(setExclusiveOwnerThread(current))。

如果 state 大于 0 时,说明锁已经被获取了,则需要判断获取锁的线程是否为当前线程(ReentrantLock 支持重入),是则需要将 state + 1,并将值更新。

写入队列

如果 tryAcquire(arg) 获取锁失败,则需要用 addWaiter(Node.EXCLUSIVE) 将当前线程写入队列中。

写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。

AQS 中的队列是由 Node 节点组成的双向链表实现的。

包装代码:

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node); 来写入了。

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

这个处理逻辑就相当于自旋加上 CAS 保证一定能写入队列。

挂起等待线程

写入队列之后需要将当前线程挂起(利用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。

如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。

waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。

shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt()

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

他是利用 LockSupportpart 方法来挂起当前线程的,直到被唤醒。

非公平锁获取锁

公平锁与非公平锁的差异主要在获取锁:

公平锁就相当于买票,后来的人需要排到队尾依次买票,不能插队

而非公平锁则没有这些规则,是抢占模式,每来一个人不会去管队列如何,直接尝试获取锁。

非公平锁:

  1. final void lock() {
  2. //直接尝试获取锁
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. acquire(1);
  7. }

公平锁:

  1. final void lock() {
  2. acquire(1);
  3. }

还要一个重要的区别是在尝试获取锁时tryAcquire(arg),非公平锁是不需要判断队列中是否还有其他线程,也是直接尝试获取锁:

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. //没有 !hasQueuedPredecessors() 判断
  6. if (compareAndSetState(0, acquires)) {
  7. setExclusiveOwnerThread(current);
  8. return true;
  9. }
  10. }
  11. else if (current == getExclusiveOwnerThread()) {
  12. int nextc = c + acquires;
  13. if (nextc < 0) // overflow
  14. throw new Error("Maximum lock count exceeded");
  15. setState(nextc);
  16. return true;
  17. }
  18. return false;
  19. }

释放锁

公平锁和非公平锁的释放流程都是一样的:

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. public final boolean release(int arg) {
  5. if (tryRelease(arg)) {
  6. Node h = head;
  7. if (h != null && h.waitStatus != 0)
  8. //唤醒被挂起的线程
  9. unparkSuccessor(h);
  10. return true;
  11. }
  12. return false;
  13. }
  14. //尝试释放锁
  15. protected final boolean tryRelease(int releases) {
  16. int c = getState() - releases;
  17. if (Thread.currentThread() != getExclusiveOwnerThread())
  18. throw new IllegalMonitorStateException();
  19. boolean free = false;
  20. if (c == 0) {
  21. free = true;
  22. setExclusiveOwnerThread(null);
  23. }
  24. setState(c);
  25. return free;
  26. }

首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state 减到 0 才认为完全释放锁。

释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。

总结

由于公平锁需要关心队列的情况,得按照队列里的先后顺序来获取锁(会造成大量的线程上下文切换),而非公平锁则没有这个限制。

所以也就能解释非公平锁的效率会被公平锁更高。

相关文章

最新文章

更多