小老弟用 案列 引出 ReentrantLock实现原理

277次阅读  |  发布于3年以前

上一遍我们深入分析了 AQS 的设计与实现,了解到 AQS 是 JUC 包实现的基础支撑。[1.3万字,从5个方面说清楚AQS 队列同步器]

本文我们就来分析一个基于 AQS 实现的 JUC 组件,即 ReentrantLock。

ReentrantLock 译为可重入锁,我们在使用时总是将其与 synchronized 关键字进行对比,实际上 ReentrantLock 与 synchronized 关键字在使用上具备相同的语义,区别仅在于 ReentrantLock 相对于 synchronized 关键字留给开发者的可操作性更强,所以在使用上更加灵活,当然凡事都有两面,灵活的背后也暗藏着更加容易出错的风险。

尽管语义相同,但 ReentrantLock 和 synchronized 关键字背后的实现机制却大相径庭。前面的文章中我们分析了 synchronized 关键字的实现内幕,知道了 synchronized 关键字背后依赖于 monitor 技术,而本文所要分析的 ReentrantLock 在实现上则依赖于 AQS 队列同步器,具体如何基于 AQS 进行实现,下面来一探究竟。

ReentrantLock 示例

本小节使用 ReentrantLock 实现一个 3 线程交替打印的程序,演示基于 ReentrantLock 实现锁的获取、释放,以及线程之间的通知机制。示例实现如下:

private static Lock lock = new ReentrantLock(true);

private static Condition ca = lock.newCondition();
private static Condition cb = lock.newCondition();
private static Condition cc = lock.newCondition();

private static volatile int idx = 0;

private static class A implements Runnable {

    @Override
    public void run() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                cb.signalAll();
                System.out.println("a: " + (++idx));
                ca.await();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

private static class B implements Runnable {

    @Override
    public void run() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                cc.signalAll();
                System.out.println("b: " + (++idx));
                cb.await();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

private static class C implements Runnable {

    @Override
    public void run() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                ca.signalAll();
                System.out.println("c: " + (++idx));
                cc.await();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public static void main(String[] args) {
    new Thread(new A()).start();
    new Thread(new B()).start();
    new Thread(new C()).start();
}

上述示例定义了 3 个线程类 A、B 和 C,并按照 A -> B -> C 的顺序进行组织,各个线程在调用 Lock#lock 方法获取到锁之后会先尝试通知后继线程(将对应的线程移入到同步队列),然后对 idx 变量进行累加并打印,接着进入等待状态并释放资源,方法 Lock#unlock 接下来会调度位于同步队列队头结点的线程继续执行。

ReentrantLock 实现内幕

Lock 接口

ReentrantLock 实现了 Lock 接口,该接口抽象了锁应该具备的基本操作,包括锁资源的获取、释放,以及创建条件对象。除了本文介绍的 ReentrantLock 外,JUC 中直接或间接实现了 Lock 接口的组件还包括 ReentrantReadWriteLock 和 StampedLock,我们将在后面的文章中对这些组件逐一分析。Lock 接口的定义如下:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

各方法释义如下:

资源的获取与释放

前面分析了 Lock 接口的定义,ReentrantLock 实现了该接口,并将接口方法的实现都委托给了 Sync 内部类处理。Sync 是一个抽象类,继承自 AbstractQueuedSynchronizer,并派生出 FairSync 和 NonfairSync 两个子类(继承关系如下图),由命名可以看出 FairSync 实现了公平锁,而 NonfairSync 则实现了非公平锁。

ReentrantLock 提供了带 boolean 参数的构造方法,依据该参数来决定是创建公平锁还是非公平锁(默认为非公平锁),构造方法定义如下:

public ReentrantLock() {
    // 默认创建非公平锁
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    // 依据参数决定创建公平锁还是非公平锁
    sync = fair ? new FairSync() : new NonfairSync();
}

下面将区分公平锁和非公平锁分析 ReentrantLock 针对 Lock 接口方法的具体实现,在开始之前先介绍一下 AQS 中的 state 字段在 ReentrantLock 中的作用。

我们知道 ReentrantLock 是可重入的,这里的可重入是指当一个线程获取到 ReentrantLock 锁之后,如果该线程再次尝试获取该 ReentrantLock 锁时仍然可以获取成功,对应的重入次数加 1。ReentrantLock 的重入次数则由 AQS 的 state 字段进行记录。当 state 为 0 时,说明目标 ReentrantLock 锁当前未被任何线程持有,当一个线程释放 ReentrantLock 锁时,对应的 state 值需要减 1。

非公平锁

本小节我们来分析一下非公平锁 NonfairSync 的实现机制,首先来看一下 NonfairSync#lock 方法,该方法用于获取资源,如果获取失败则会将当前线程加入到同步队列中阻塞等待。方法实现如下:

final void lock() { // 尝试获取锁,将 state 由 0 设置为 1 if (this.compareAndSetState(0, 1)) { // 首次获取锁成功,记录当前锁对象 this.setExclusiveOwnerThread(Thread.currentThread()); } else { // 目标锁对象已经被占用,或者非首次获取目标锁对象 this.acquire(1); } } 方法 NonfairSync#lock 加锁的过程首先会基于 CAS 操作尝试将 ReentrantLock 的 state 值由 0 改为 1,抢占锁资源,这也是非公平语义的根本所在。如果操作成功,则说明目标 ReentrantLock 锁当前未被任何线程持有,且本次加锁成功。如果操作失败则区分两种情况:

  1. 目标 ReentrantLock 锁已被当前线程持有
  2. 目标 ReentrantLock 锁已被其它线程持有

针对这两种情况,接下来会调用 AbstractQueuedSynchronizer#acquire 方法尝试获取 1 个单位的资源,该方法由 AQS 实现,我们已经在前面的文章中分析过,其中会执行模板方法 AbstractQueuedSynchronizer#tryAcquire。NonfairSync 针对该模板方法的实现如下:

protected final boolean tryAcquire(int acquires) {
    return this.nonfairTryAcquire(acquires);
}

上述方法将尝试获取资源的逻辑委托给 Sync#nonfairTryAcquire 方法执行,ReentrantLock 的 ReentrantLock#tryLock() 方法同样基于该方法实现。下面来分析一下该方法的执行逻辑,实现如下:

final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程对象
    final Thread current = Thread.currentThread();
    // 获取 state 值
    int c = this.getState();
    if (c == 0) {
        // state 为 0,表示目标锁当前未被持有,尝试获取锁
        if (this.compareAndSetState(0, acquires)) {
            this.setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果当前已经持有锁的线程已经是当前线程
    else if (current == this.getExclusiveOwnerThread()) {
        // 重入次数加 1
        int nextc = c + acquires;
        if (nextc < 0) {
            // 重入次数溢出
            throw new Error("Maximum lock count exceeded");
        }
        // 更新 state 记录的重入次数
        this.setState(nextc);
        return true;
    }
    // 已经持有锁的线程不是当前线程,尝试加锁失败
    return false;
}

方法 Sync#nonfairTryAcquire 的执行流程可以概括为;

  1. 获取当前 ReentrantLock 锁的 state 值;
  2. 如果 state 值为 0,说明当前 ReentrantLock 锁未被任何线程持有,基于 CAS 尝试将 state 值由 0 改为 1,抢占锁资源,修改成功即为加锁成功;
  3. 否则,如果当前已经持有该 ReentrantLock 锁的线程是自己,则修改重入次数(即将 state 值加 1);
  4. 否则,目标 ReentrantLock 锁已经被其它线程持有,加锁失败。

如果 Sync#nonfairTryAcquire 方法返回 false,则说明当前线程尝试获取目标 ReentrantLock 锁失败,对于 ReentrantLock#lock 方法而言,接下去线程会被加入到同步队列阻塞等待,而对于 ReentrantLock#tryLock() 方法而言,线程会立即退出,并返回 false。

方法 ReentrantLock#newCondition 同样是委托给 Sync#newCondition 方法处理,该方法只是简单的创建了一个 ConditionObject 对象,即新建了一个条件队列。非公平锁 NonfairSync 中的以下方法都是直接委托给 AQS 处理,这些方法的实现机制已在前面分析 AQS 时介绍过:

ReentrantLock#lockInterruptibly:直接委托给 AbstractQueuedSynchronizer#acquireInterruptibly 方法实现,获取的资源数为 1。ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit):直接委托给 AbstractQueuedSynchronizer#tryAcquireNanos 方法实现,获取的资源数为 1。

ReentrantLock#unlock:直接委托给 AbstractQueuedSynchronizer#release 方法实现,释放的资源数为 1。前面的文章,我们在分析 AQS 的 AbstractQueuedSynchronizer#release 方法时,曾介绍过该方法会调用模板方法 AbstractQueuedSynchronizer#tryRelease 以尝试释放资源。ReentrantLock 针对该模板方法的实现位于 Sync 抽象类中,所以它是一个由 NonfairSync 和 FairSync 共用的方法,下面来分析一下该方法的实现。

protected final boolean tryRelease(int releases) {
    // 将当前 state 记录的重入次数减 1
    int c = this.getState() - releases;
    // 如果当前持有锁的线程对象不是当前线程则抛出异常
    if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
        throw new IllegalMonitorStateException();
    }
    boolean free = false;
    // 如果重入次数已经降为 0,则清空持有当前锁的线程对象
    if (c == 0) {
        free = true;
        this.setExclusiveOwnerThread(null);
    }
    // 更新当前锁的重入次数
    this.setState(c);
    return free;
}

尝试释放资源的过程本质上就是修改 state 字段值的过程,如果当前操作的线程是持有 ReentrantLock 锁的线程,则上述方法会将 state 值减 1,即将已重入次数减 1。如果修改后的 state 字段值为 0,则说明当前线程已经释放了持有的 ReentrantLock 锁,此时需要清除记录在 ReentrantLock 对象中的线程 Thread 对象。

公平锁

本小节我们来分析一下公平锁 FairSync 的实现机制,这里的公平本质上是指公平的获取锁资源,所以主要的区别体现在加锁的过程,即 ReentrantLock#lock 方法。

前面我们在分析 NonfairSync 时看到,NonfairSync 在加锁时首先会基于 CAS 尝试将 state 值由 0 改为 1,失败的情况下才会继续调用 AbstractQueuedSynchronizer#acquire 方法等待获取资源,并且在同步队列中等待期间仍然会在 state 为 0 时抢占获取锁资源。

FairSync 相对于 NonfairSync 的区别在于当 state 值为 0 时,即目标 ReentrantLock 锁此时未被任何线程持有的情况下,FairSync 并不会去抢占锁资源,而是检查同步队列中是否有排在前面等待获取锁资源的其它线程,如果有则让渡这些排在前面的线程优先获取锁资源。

下面来看一下 FairSync#lock 方法的实现,该方法只是简单的将获取锁资源操作委托给 AQS 的 AbstractQueuedSynchronizer#acquire 方法执行,所以我们需要重点关注一下模板方法 FairSync#tryAcquire 的实现:

protected final boolean tryAcquire(int acquires) {
        // 获取当前线程对象
        final Thread current = Thread.currentThread();
        // 获取当前 state 值
        int c = this.getState();
        if (c == 0) {
            // state 为 0,表示目标锁当前未被持有,先检查是否有阻塞等待当前锁的线程,如果没有再尝试获取锁
            if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, acquires)) {
                this.setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果当前已经持有锁的线程已经是当前线程,则修改已重入次数加 1
        else if (current == this.getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) {
                throw new Error("Maximum lock count exceeded");
            }
            this.setState(nextc);
            return true;
        }
        return false;
    }
}

上述方法的执行流程与 NonfairSync 中的相关实现大同小异,主要区别在于当 state 值为 0 时,FairSync 会调用 AbstractQueuedSynchronizer#hasQueuedPredecessors 检查当前同步队列中是否还有等待获取锁资源的其它线程,如果存在则优先让这些线程获取锁资源,并将自己加入到同步队列中排队等待。

总结

本文我们通过一个 3 线程交替打印的程序演示了 ReentrantLock 的基本使用,并一起分析了 ReentrantLock 的实现机制。因为基于 AQS 实现,所以大部分的操作已经由 AQS 完成,ReentrantLock 只需要关注自身定制化逻辑即可,整体实现要简单了很多。

理解了 ReentrantLock 的实现机制,应该会对 AQS 的设计与实现有更进一步的认识。

最后留两个小思考题:

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8