提到并发编程,当然离不开ReentrantLock。但祥说清楚ReentrantLock,又必须先搞明白AQS,所以,今天我们就来详细的聊聊这个AQS。
请耐心看,慢慢看,文章比较长~
AbstractQueuedSynchronizer 简称 「AQS」,可能我们几乎不会直接去使用它,但它却是 JUC 的核心基础组件,支撑着 java 锁和同步器的实现,例如 ReentrantLock
、ReentrantReadWriteLock
、CountDownLatch
,以及 Semaphore
等。大神 「Doug Lea」 在设计 JUC 包时希望能够抽象一个基础且通用的组件以支撑上层模块的实现,AQS 应运而生。
「AQS」 本质上是一个 FIFO 的双向队列,线程被包装成结点的形式,基于自旋机制在队列中等待获取资源(这里的资源可以简单理解为对象锁)。AQS 在设计上实现了两类队列,即 同步队列 和 条件队列 ,其中同步队列服务于线程阻塞等待获取资源,而条件队列则服务于线程因某个条件不满足而进入等待状态。条件队列中的线程实际上已经获取到了资源,但是没有能够继续执行下去的条件,所以被打入条件队列并释放持有的资源,以让渡其它线程执行,如果未来某个时刻条件得以满足,则该线程会被从条件队列转移到同步队列,继续参与竞争资源,以继续向下执行。
本文我们主要分析 AQS 的设计与实现,包括 LockSupport 工具类、同步队列、条件队列,以及 AQS 资源获取和释放的通用过程。
「AQS」 采用模板方法设计模式,具体获取资源和释放资源的过程都交由子类实现,对于这些方法的分析将留到后面分析具体子类的文章中再展开。
LockSupport 工具类是 JUC 的基础组件,主要作用是用来阻塞和唤醒线程,底层依赖于 Unsafe
类实现。LockSupport 主要定义了两类方法:park 和 unpark,其中 park 方法用于阻塞当前线程,而 unpark 方法用于唤醒处于阻塞状态的指定线程。
下面的示例演示了 park
和unpark
方法的基本使用:
Thread thread = new Thread(() -> {
System.out.println("Thread start: " + Thread.currentThread().getName());
LockSupport.park(); // 阻塞自己
System.out.println("Thread end: " + Thread.currentThread().getName());
});
thread.setName("A");
thread.start();
System.out.println("Main thread sleep 3 second: " + Thread.currentThread().getId());
TimeUnit.SECONDS.sleep(3);
LockSupport.unpark(thread); // 唤醒线程 A
线程 A 在启动之后调用了 LockSupport#park
方法将自己阻塞,主线程在休息 3 秒之后调用 LockSupport#unpark
方法线程 A 唤醒。运行结果:
Thread start: A
Main thread sleep 3 second: 1
Thread end: A
LockSupport 针对 park 方法提供了多种实现,如下:
public static void park()
public static void park(Object blocker)
public static void parkNanos(long nanos)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(long deadline)
public static void parkUntil(Object blocker, long deadline)
由方法命名不难看出,parkNanos 和 parkUntil 都属于 park 方法的超时版本,区别在于 parkNanos 方法接收一个纳秒单位的时间值,用于指定阻塞的时间长度,例如当设置 nanos=3000000000 时,线程将阻塞 3 秒后苏醒,而 parkUntil 方法则接收一个时间戳,参数 deadline 用于指定阻塞的到期时间。
所有的 park 方法都提供了包含 Object blocker 参数的重载版本,参数 blocker 指代导致当前线程阻塞等待的锁对象,方便问题排查和系统监控,而在 LockSupport 最开始被设计时却忽视了这一点,导致在线程 dump 时无法提供阻塞对象的相关信息,这一点在 java 6 中得以改进。实际开发中如果使用到了 LockSupport 工具类,推荐使用带 blocker 参数的版本。
下面以 LockSupport#park(java.lang.Object)
方法为例来看一下具体的实现,如下:
public static void park(Object blocker) {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 记录当前线程阻塞等待的锁对象(设置线程对象的 parkBlocker 为参数指定的 blocker 对象)
setBlocker(t, blocker);
// 阻塞线程
UNSAFE.park(false, 0L);
// 线程恢复运行,清除 parkBlocker 参数记录的锁对象
setBlocker(t, null);
}
具体实现比较简单,阻塞线程的操作依赖于 Unsafe 类实现。上述方法会调用 LockSupport#setBlocker
方法基于 Unsafe 类将参数指定的 blocker 对象记录到当前线程对象的 Thread#parkBlocker
字段中,然后进入阻塞状态,并在被唤醒之后清空对应的 Thread#parkBlocker 字段。
当一个线程调用 park 方法进入阻塞状态之后,会在满足以下 3 个条件之一时从阻塞状态中苏醒:
线程在从 park 方法中返回时并不会携带具体的返回原因,调用者需要自行检测,例如再次检查之前调用 park 方法的条件是否仍然满足以予以推测。
方法 LockSupport#unpark
的实现同样基于 Unsafe 类实现,不同于 park 的多版本实现,LockSupport 针对 unpark 方法仅提供了单一实现,如下:
public static void unpark(Thread thread) {
if (thread != null) {
UNSAFE.unpark(thread);
}
}
需要注意的一点是,如果事先针对某个线程调用了 unpark 方法,则该线程继续调用 park 方法并不会进入阻塞状态,而是会立即返回,并且 park 方法是不可重入的。
同步队列的作用在于管理竞争资源的线程,当一个线程竞争资源失败会被记录到同步队列的末端,并以自旋的方式循环检查能够成功获取到资源。AQS 的同步队列基于 「CLH」(Craig, Landin, and Hagersten
) 锁思想进行设计和实现。CLH 锁是一种基于链表的可扩展、高性能,且具备公平性的自旋锁。线程以链表结点的形式进行组织,在等待期间相互独立的执行自旋,并不断轮询前驱结点的状态,如果发现前驱结点上的线程释放了资源则尝试获取。
CLH 锁是 AQS 队列同步器实现的基础,AQS 以内部类 Node 的形式定义了同步队列结点,包括下一小节介绍的条件队列,同样以 Node 定义结点。Node 的字段定义如下:
static final class Node {
/** 模式定义 */
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** 线程状态 */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 线程等待状态 */
volatile int waitStatus;
/** 前驱结点 */
volatile Node prev;
/** 后置结点 */
volatile Node next;
/** 持有的线程对象 */
volatile Thread thread;
/** 对于独占模式而言,指向下一个处于 CONDITION 等待状态的结点;对于共享模式而言,则为 SHARED 结点 */
Node nextWaiter;
// ... 省略方法定义
}
由上述字段定义可以看出,位于 CLH 链表中的线程以 2 种模式在等待资源,即 SHARED 和 EXCLUSIVE,其中 SHARED 表示共享模式,而 EXCLUSIVE 表示独占模式。共享模式与独占模式的主要区别在于,同一时刻独占模式只能有一个线程获取到资源,而共享模式在同一时刻可以有多个线程获取到资源。典型的场景就是读写锁,读操作可以有多个线程同时获取到读锁资源,而写操作同一时刻只能有一个线程获取到写锁资源,其它线程在尝试获取资源时都会被阻塞。
AQS 的 CLH 锁为处于 CLH 链表中的线程定义了 4 种状态,包括 CANCELLED
、SIGNAL
、CONDITION
,以及 PROPAGATE
,并以 Node#waitStatus
字段进行记录。这 4 种状态的含义分别为:
CANCELLED
:表示当前线程处于取消状态,一般是因为等待超时或者被中断,处于取消状态的线程不会再参与到竞争中,并一直保持该状态。__SIGNAL__
:表示当前结点后继结点上的线程正在等待被唤醒,如果当前线程释放了持有的资源或者被取消,需要唤醒后继结点上的线程。CONDITION
:表示当前线程正在等待某个条件,当某个线程在调用了 Condition#signal 方法后,当前结点将会被从条件队列转移到同步队列中,参与竞争资源。PROPAGATE
:处于该状态的线程在释放共享资源,或接收到释放共享资源的信号时需要通知后继结点,以防止通知丢失。一个结点在被创建时,字段 Node#waitStatus 的初始值为 0,表示结点上的线程不位于上述任何状态。
Node 类在方法定义上除了基本的构造方法外,仅定义了 Node#isShared 和 Node#predecessor
两个方法,其中前者用于返回当前结点是否以共享模式在等待,后者用于返回当前结点的前驱结点。
介绍完了队列结点的定义,那么同步队列具体如何实现呢?这还需要依赖于 AbstractQueuedSynchronizer 类中的两个字段定义,即:
private transient volatile Node head;
private transient volatile Node tail;
其中 head 表示同步队列的头结点,而 tail 则表示同步队列的尾结点,具体组织形式如下图:
当调用 AQS 的 acquire 方法获取资源时,如果资源不足则当前线程会被封装成 Node 结点添加到同步队列的末端,头结点 head 用于记录当前正在持有资源的线程结点,而 head 的后继结点就是下一个将要被调度的线程结点,当 release 方法被调用时,该结点上的线程将被唤醒,继续获取资源。
关于同步队列结点入队列、出队列的实现先不展开,留到后面分析 AQS 资源获取与释放的过程时一并分析。
除了上面介绍的同步队列,在 AQS 中还定义了一个条件队列。内部类 ConditionObject 实现了条件队列的组织形式,包含一个起始结点(firstWaiter)和一个末尾结点(lastWaiter),并同样以上面介绍的 Node 类定义结点,如下:
public class ConditionObject implements Condition, Serializable {
/** 指向条件队列中的起始结点 */
private transient Node firstWaiter;
/** 指向条件队列的末尾结点 */
private transient Node lastWaiter;
// ... 省略方法定义
}
前面在分析 Node 内部类的时候,可以看到 Node 类还定义了一个 Node#nextWaiter
字段,用于指向条件队列中的下一个等待结点。由此我们可以描绘出条件队列的组织形式如下:
ConditionObject
类实现了 Condition 接口,该接口定义了与 Lock 锁相关的线程通信方法,主要分为 await
和 signal
两大类。
当线程调用 await 方法时,该线程会被包装成结点添加到条件队列的末端,并释放持有的资源。当条件得以满足时,方法 signal 可以将条件队列中的一个或全部的线程结点从条件队列转移到同步队列以参与竞争资源。应用可以创建多个 ConditionObject 对象,每个对象都对应一个条件队列,对于同一个条件队列而言,其中的线程所等待的条件是相同的。
Condition 接口的定义如下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
下面来分析一下 ConditionObject 类针对 Condition 接口方法的实现,首先来看一下 ConditionObject#await
方法,该方法用于将当前线程添加到条件队列中进行等待,同时支持响应中断。方法实现如下:
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
// 立即响应中断
throw new InterruptedException();
}
// 将当前线程添加到等待队列末尾,等待状态为 CONDITION
Node node = this.addConditionWaiter();
// 释放当前线程持有的资源
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 如果当前结点位于条件队列中,则循环
// 阻塞当前线程
LockSupport.park(this);
// 如果线程在阻塞期间被中断,则退出循环
if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 如果在同步队列中等待期间被中断,且之前的中断状态不为 THROW_IE
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
if (node.nextWaiter != null) {
// 清除条件队列中所有状态不为 CONDITION 的结点
this.unlinkCancelledWaiters();
}
// 如果等待期间被中断,则响应中断
if (interruptMode != 0) {
this.reportInterruptAfterWait(interruptMode);
}
}
因为 ConditionObject#await
方法支持响应中断,所以在方法一开始会先检查一下当前线程是否被中断,如果是则抛出 InterruptedException 异常,否则继续将当前线程加入到条件队列中进行等待。整体执行流程可以概括为:
ConditionObject 定义了两种中断响应方式,即:REINTERRUPT
和 THROW_IE
。如果是 REINTERRUPT
,则线程会调用 Thread#interrupt
方法中断自己;如果是 THROW_IE
,则线程会直接抛出 InterruptedException 异常。
下面继续分析一下支撑 ConditionObject#await
运行的其它几个方法,包括 addConditionWaiter
、fullyRelease
、isOnSyncQueue
,以及 unlinkCancelledWaiters
。
方法 ConditionObject#addConditionWaiter
用于将当前线程包装成 Node 结点对象添加到条件队列的末端,期间会执行清除条件队列中处于取消状态(等待状态不为 CONDITION)的线程结点。方法实现如下:
private Node addConditionWaiter() {
// 获取条件队列的末尾结点
Node t = lastWaiter;
// 如果末尾结点状态不为 CONDITION,表示对应的线程已经取消了等待,需要执行清理操作
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除条件队列中所有状态不为 CONDITION 的结点
this.unlinkCancelledWaiters();
t = lastWaiter;
}
// 构建当前线程对应的 Node 结点,等待状态为 CONDITION,并添加到条件队列末尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
firstWaiter = node;
} else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
将当前线程对象添加到条件队列中的过程本质上是一个简单的链表插入操作,在执行插入操作之前,上述方法会先对条件队列执行一遍清理操作,清除那些状态不为 CONDITION 的结点。具体实现位于 ConditionObject#unlinkCancelledWaiters
方法中:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null; // 记录上一个不被删除的结点
while (t != null) {
Node next = t.nextWaiter;
// 如果结点上的线程等待状态不为 CONDITION,则删除对应结点
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null) {
firstWaiter = next;
} else {
trail.nextWaiter = next;
}
if (next == null) {
lastWaiter = trail;
}
} else {
trail = t;
}
t = next;
}
}
方法 AbstractQueuedSynchronizer#fullyRelease
用于释放当前线程持有的资源,这也是非常容易理解的,毕竟当前线程即将进入等待状态,如果持有的资源不被释放,将可能导致程序最终被饿死,或者死锁。方法的实现如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前线程的同步状态,可以理解为持有的资源数量
int savedState = this.getState();
// 尝试释放当前线程持有的资源
if (this.release(savedState)) {
failed = false;
return savedState;
} else {
// 释放资源失败
throw new IllegalMonitorStateException();
}
} finally {
if (failed) {
// 如果释放资源失败,则取消当前线程
node.waitStatus = Node.CANCELLED;
}
}
}
如果资源释放失败,则上述方法会将当前线程的状态设置为 CANCELLED,以退出等待状态。
方法 AbstractQueuedSynchronizer
#isOnSyncQueue
用于检测当前结点是否位于同步队列中,方法实现如下:
final boolean isOnSyncQueue(Node node) {
// 如果结点位于等待队列,或是头结点则返回 false
if (node.waitStatus == Node.CONDITION || node.prev == null) {
return false;
}
// If has successor, it must be on queue
if (node.next != null) {
return true;
}
/*
* node.prev can be non-null, but not yet on queue because the CAS to place it on queue can fail.
* So we have to traverse from tail to make sure it actually made it. It will always be near the tail in calls to this method,
* and unless the CAS failed (which is unlikely), it will be there, so we hardly ever traverse much.
*/
// 从后往前检测目标结点是否位于同步队列中
return this.findNodeFromTail(node);
}
如果一个线程所等待的条件被满足,则触发条件满足的线程会将等待该条件的一个或全部线程结点从条件队列转移到同步队列,此时,这些线程将从 ConditionObject#await 方法中退出,以参与竞争资源。
方法 ConditionObject
#awaitNanos
、ConditionObject
#awaitUntil
和 ConditionObject
#await(long, TimeUnit)
在上面介绍的 ConditionObject
#await
方法的基础上引入了超时机制,当一个线程在条件队列中等待的时间超过设定值时,线程结点将被从条件队列转移到同步队列,参与竞争资源。其它执行过程与 ConditionObject#await
方法相同,故不再展开。
下面来分析一下 ConditionObject#awaitUninterruptibly
方法,由方法命名可以看出该方法相对于 ConditionObject#await
方法的区别在于在等待期间不响应中断。方法实现如下:
public final void awaitUninterruptibly() {
// 将当前线程添加到等待队列末尾,等待状态为 CONDITION
Node node = this.addConditionWaiter();
// 释放当前线程持有的资源
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果当前结点位于条件队列中,则循环
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
if (Thread.interrupted()) {
// 标识线程等待期间被中断,但不立即响应
interrupted = true;
}
}
// 自旋获取资源,返回 true 则说明等待期间被中断过
if (acquireQueued(node, savedState) || interrupted) {
// 响应中断
selfInterrupt();
}
}
如果线程在等待期间被中断,则上述方法会用一个字段进行记录,并在最后集中处理,而不会因为中断而退出等待状态。
调用 await 方法会将线程对象自身加入到条件队列中进行等待,而 signal 通知方法则用于将一个或全部的等待线程从条件队列转移到同步队列,以参与竞争资源。ConditionObject 定义了两个通知方法:signal
和signalAll
,前者用于将条件队列的头结点(也就是等待时间最长的结点)从条件队列转移到同步队列,后者用于将条件队列中所有处于等待状态的结点从条件队列转移到同步队列。下面分别来分析一下这两个方法的实现。
方法ConditionObject#signal
的实现如下:
public final void signal() {
// 先检测当前线程是否获取到了锁,否则不允许继续执行
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 获取条件队列头结点,即等待时间最长的结点
Node first = firstWaiter;
if (first != null) {
// 将头结点从条件队列转移到同步队列,参与竞争资源
this.doSignal(first);
}
}
调用ConditionObject#signal
方法的线程必须位于临界区,也就是必须先持有独占锁,所以上述方法一开始会对这一条件进行校验,方法 AbstractQueuedSynchronizer#isHeldExclusively
是一个模板方法,交由子类来实现。如果满足执行条件,则上述方法会调用 ConditionObject#doSignal 方法将条件队列的头结点从条件队列转移到同步队列。
private void doSignal(Node first) {
// 从前往后遍历,直到遇到第一个不为 null 的结点,并将其从条件队列转移到同步队列
do {
if ((firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
// 更新当前结点的等待状态:CONDITION -> 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 更新失败,说明对应的结点上的线程已经被取消
return false;
}
/*
* Splice onto queue and try to set waitStatus of predecessor to indicate that thread is (probably) waiting.
* If cancelled or attempt to set waitStatus fails, wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong).
*/
// 将结点添加到同步队列末端,并返回该结点的前驱结点
Node p = this.enq(node);
int ws = p.waitStatus;
// 如果前驱结点被取消,或者设置前驱结点的状态为 SIGNAL 失败,则唤醒当前结点上的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
LockSupport.unpark(node.thread);
}
return true;
}
方法 ConditionObject#doSignal
会从前往后遍历条件队列,寻找第一个不为 null 的结点,并应用 AbstractQueuedSynchronizer#transferForSignal
方法尝试将其从条件队列转移到同步队列。
在入同步队列之前,方法 AbstractQueuedSynchronizer#transferForSignal
会基于 CAS 机制清除结点的 CONDITION 状态,如果清除失败则说明该结点上的线程已被取消,此时 ConditionObject#doSignal
方法会继续寻找下一个可以被唤醒的结点。如果清除结点状态成功,则接下来会将该结点添加到同步队列的末端,同时依据前驱结点的状态决定是否唤醒当前结点上的线程。
继续来看 ConditionObject#signalAll 方法的实现,相对于上面介绍的 ConditionObject#signal 方法,该方法的特点在于它会唤醒条件队列中所有不为 null 的等待结点。方法实现如下:
public final void signalAll() {
if (!isHeldExclusively()) {
// 先检测当前线程是否获取到了锁,否则不允许继续执行
throw new IllegalMonitorStateException();
}
// 获取条件队列头结点
Node first = firstWaiter;
if (first != null) {
// 将所有结点从条件队列转移到同步队列,参与竞争资源
this.doSignalAll(first);
}
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
实际上理解了 ConditionObject#doSignal
的运行机制,再理解 ConditionObject#signalAll
的运行机制也是水到渠成的事情。
前面的小节我们分析了 LockSupport 工具类,以及 AQS 同步队列和条件队列的设计与实现,这些都是支撑 AQS 运行的基础组件,本小节我们将正式开始分析 AQS 的实现机制。
AQS 对应的 AbstractQueuedSynchronizer
实现类,在属性定义上主要包含 4 个字段(如下),其中 exclusiveOwnerThread 由父类 AbstractOwnableSynchronizer 继承而来,用于记录当前持有独占锁的线程对象,而 head 和 tail 字段分别指向同步队列的头结点和尾结点:
private transient Thread exclusiveOwnerThread;
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
字段 state 用于描述同步状态,对于不同的实现类来说具备不同的用途:
AbstractQueuedSynchronizer 是一个抽象类,在方法设计上引入了模板方法设计模式,下面的代码块中列出了所有需要子类依据自身运行机制针对性实现的模板方法:
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
protected boolean isHeldExclusively()
这里先简单说明一下各个方法的作用,具体实现留到后面分析各个基于 AQS 实现组件的文章中再进一步分析:
针对独占模式获取资源,AbstractQueuedSynchronizer 定义了多个版本的 acquire 方法实现,包括:acquire、acquireInterruptibly,以及 tryAcquireNano
s,其中 acquireInterruptibly
是 acquire 的中断版本,在等待获取资源期间支持响应中断请求,tryAcquireNanos 除了支持响应中断以外,还引入了超时等待机制。
下面主要分析一下 AbstractQueuedSynchronizer#acquire
的实现,理解了该方法的实现机制,也就自然而然理解了另外两个版本的实现机制。方法 AbstractQueuedSynchronizer#acquire
的实现如下:
public final void acquire(int arg) {
if (!this.tryAcquire(arg) // 尝试获取资源
// 如果获取资源失败,则将当前线程加入到同步队列的末端(独占模式),并基于自旋机制等待获取资源
&& this.acquireQueued(this.addWaiter(Node.EXCLUSIVE), arg)) {
// 等待获取资源期间曾被中断过,在获取资源成功之后再响应中断
selfInterrupt();
}
}
方法 AbstractQueuedSynchronizer#tryAcquire
的功能在前面已经简单介绍过了,用于尝试获取资源,如果获取资源失败则会将当前线程添加到同步队列中,基于自旋机制等待获取资源。
方法AbstractQueuedSynchronizer#addWaiter
用于将当前线程对象封装成结点添加到同步队列末端,并最终返回线程结点对象:
private Node addWaiter(Node mode) {
// 为当前线程创建结点对象
Node node = new Node(Thread.currentThread(), mode);
// 基于 CAS 机制尝试快速添加结点到同步队列末端
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (this.compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速添加失败,继续尝试将该结点添加到同步队列末端,如果同步队列未被初始化则执行初始化
this.enq(node);
// 返回当前线程对应的结点对象
return node;
}
上述方法在添加结点的时候,如果同步队列已经存在,则尝试基于 CAS 操作快速将当前结点添加到同步队列末端。如果添加失败,或者队列不存在,则需要再次调用 AbstractQueuedSynchronizer#enq
方法执行添加操作,该方法在判断队列不存在时会初始化同步队列,然后基于 CAS 机制尝试往同步队列末端插入线程结点。方法实现如下:
private Node enq(final Node node) {
for (; ; ) {
// 获取同步队列末尾结点
Node t = tail;
// 如果结点不存在,则初始化
if (t == null) { // Must initialize
if (this.compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 往末尾追加
node.prev = t;
if (this.compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
完成了结点的入同步队列操作,接下来会调用 AbstractQueuedSynchronizer#acquireQueued
方法基于自旋机制等待获取资源,在等待期间并不会响应中断,而是记录中断标志,等待获取资源成功后延迟响应。方法实现如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 标记自旋过程中是否被中断
// 基于自旋机制等待获取资源
for (; ; ) {
// 获取前驱结点
final Node p = node.predecessor();
// 如果前驱结点为头结点,说明当前结点是排在同步队列最前面,可以尝试获取资源
if (p == head && this.tryAcquire(arg)) {
// 获取资源成功,更新头结点
this.setHead(node); // 头结点一般记录持有资源的线程结点
p.next = null; // help GC
failed = false;
return interrupted; // 自旋过程中是否被中断
}
// 如果还未轮到当前结点,或者获取资源失败
if (shouldParkAfterFailedAcquire(p, node) // 判断是否需要阻塞当前线程
&& this.parkAndCheckInterrupt()) { // 如果需要,则进入阻塞状态,并在苏醒时检查中断状态
// 标识等待期间被中断
interrupted = true;
}
}
} finally {
// 尝试获取资源失败,说明执行异常,取消当前结点获取资源的进程
if (failed) {
this.cancelAcquire(node);
}
}
}
上述方法会循环检测当前结点是否已经排在同步队列的最前端,如果是则调用 AbstractQueuedSynchronizer#tryAcquire
方法尝试获取资源,具体获取资源的过程由子类实现。自旋期间如果还未轮到调度当前线程结点,或者尝试获取资源失败,则会调用 AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
方法检测是否需要阻塞当前线程,具体判定的过程依赖于前驱结点的等待状态,实现如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 前驱结点状态为 SIGNAL,说明当前结点需要被阻塞
return true;
}
if (ws > 0) {
// 前驱结点处于取消状态,则一直往前寻找处于等待状态的结点,并排在其后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 前驱结点的状态为 0 或 PROPAGATE,但是当前结点需要一个被唤醒的信号,
* 所以基于 CAS 将前驱结点等待状态设置为 SIGNAL,在阻塞之前,调用者需要重试以再次确认不能获取到资源。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上述方法首先会获取前驱结点的等待状态,并依据具体的状态值进行决策:
如果前驱结点等待状态为 SIGNAL
,则说明当前结点需要被阻塞,所以直接返回 true;否则,如果前驱结点的等待状态大于 0(即处于取消状态),则一直往前寻找未被取消的结点,并将当前结点排在其后,这种情况下直接返回 false,再次尝试获取一次资源;否则,前驱结点的状态为 0 或 PROPAGATE(不可能为 CONDITION 状态,因为当前处于同步队列),因为当前结点需要一个唤醒信号,所以修改前驱结点的状态为 SIGNAL,这种情况下同样返回 false,以再次确认不能获取到资源。如果上述检查返回 true,则接下来会调用 AbstractQueuedSynchronizer#parkAndCheckInterrupt
方法,基于 LockSupport 工具阻塞当前线程,并在线程苏醒时检查中断状态。如果期间被中断过则记录中断标记,而不立即响应,直到成功获取到资源,或者期间发生异常退出自旋。方法 AbstractQueuedSynchronizer#acquireQueued
最终会返回这一中断标记,并在外围进行响应。
如果在自旋期间发生异常,则上述方法会执行 AbstractQueuedSynchronizer#cancelAcquire 以取消当前结点等待获取资源的进程,包括设置结点的等待状态为 CANCELLED,唤醒后继结点等。
针对独占模式释放资源,AbstractQueuedSynchronizer 定义了单一实现,即 AbstractQueuedSynchronizer#release
方法,该方法本质上是一个调度的过程,具体释放资源的操作交由 tryRelease 方法完成,由子类实现。
方法AbstractQueuedSynchronizer#release
实现如下:
public final boolean release(int arg) {
// 尝试释放资源
if (this.tryRelease(arg)) {
Node h = head;
// 如果释放资源成功,则尝试唤醒后继结点
if (h != null && h.waitStatus != 0) {
this.unparkSuccessor(h);
}
return true;
}
return false;
}
如果 tryRelease 释放资源成功,则上述方法会尝试唤醒同步队列中由后往前距离头结点最近的一个结点上的线程。方法 AbstractQueuedSynchronizer#unparkSuccessor 的实现如下:
private void unparkSuccessor(Node node) {
// 获取当前结点状态
int ws = node.waitStatus;
if (ws < 0) {
// 如果当前结点未被取消,则基于 CAS 更新结点等待状态为 0
compareAndSetWaitStatus(node, ws, 0);
}
/*
* Thread to unpark is held in successor, which is normally just the next node.
* But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
*/
Node s = node.next; // 获取后继结点
// 如果后继结点为 null,或者被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前寻找距离当前结点最近的一个未被取消的线程结点
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
// 唤醒结点上的线程
if (s != null) {
LockSupport.unpark(s.thread);
}
}
选举待唤醒线程结点的过程被设计成从后往前遍历,寻找距离当前结点最近的未被取消的结点,并调用 LockSupport 工具类唤醒结点上的线程。
那 为什么要设计成从后往前遍历同步队列呢 ?在 Doug Lea 大神的论文 The java.util.concurrent Synchronizer Framework 中给出了答案,摘录如下:
An AbstractQueuedSynchronizer queue node contains a next link to its successor. But because there are no applicable techniques for lock-free atomic insertion of double-linked listnodes using compareAndSet, this link is not atomically set as part of insertion; it is simply assigned: pred.next = node; after the i
nsertion. This is reflected in all usages. The next link is treated only as an optimized path.
If a node’s successor does not appear to exist (or appears to be cancelled) via its next field, it is a
lways possible to start at the tail of the list and traverse backwards using the pred field to accurately check if therereally is one.
也就说对于双向链表而言,没有不加锁的原子手段可以保证构造双向指针的线程安全性。回到代码中,我们回顾一下往同步队列中添加结点的执行过程,如下(其中 pred 是末尾结点,而 node 是待插入的结点):
node.prev = pred;
if (this.compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
上述方法会将 node 结点的 prev 指针指向 pred 结点,而将 pred 的 next 指针指向 node 结点的过程需要建立在基于 CAS 成功将 node 设置为末端结点的基础之上,如果这一过程失败则 next 指针将会断掉,而选择从后往前遍历则始终能够保证遍历到头结点。
共享获取资源 针对共享模式获取资源,AbstractQueuedSynchronizer 同样定义了多个版本的 acquire 方法实现,包括:acquireShared、acquireSharedInterruptibly,以及 tryAcquireSharedNanos,其中 acquireSharedInterruptibly 是 acquireShared 的中断版本,在等待获取资源期间支持响应中断请求,tryAcquireSharedNanos 除了支持响应中断以外,还引入了超时等待机制。下面同样主要分析一下 AbstractQueuedSynchronizer#acquireShared 的实现,理解了该方法的实现机制,也就自然而然理解了另外两个版本的实现机制。
方法 AbstractQueuedSynchronizer#acquireShared 的实现如下:
public final void acquireShared(int arg) {
// 返回负数表示获取资源失败
if (this.tryAcquireShared(arg) < 0) {
// 将当前线程添加到条件队列,基于自旋等待获取资源
this.doAcquireShared(arg);
}
}
private void doAcquireShared(int arg) {
// 将当前线程加入条件队列末端,并标记为共享模式
final Node node = this.addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false; // 标记自旋过程中是否被中断
for (; ; ) {
// 获取前驱结点
final Node p = node.predecessor();
// 如果前驱结点为头结点,说明当前结点是排在同步队列最前面,可以尝试获取资源
if (p == head) {
// 尝试获取资源
int r = this.tryAcquireShared(arg);
if (r >= 0) {
// 获取资源成功,设置自己为头结点,并尝试唤醒后继结点
this.setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted) {
selfInterrupt();
}
failed = false;
return;
}
}
// 如果还未轮到当前结点,或者获取资源失败
if (shouldParkAfterFailedAcquire(p, node) // 判断是否需要阻塞当前线程
&& this.parkAndCheckInterrupt()) { // 如果需要,则进入阻塞状态,并在苏醒时检查中断状态
// 标识等待期间被中断
interrupted = true;
}
}
} finally {
// 尝试获取资源失败,说明执行异常,取消当前结点获取资源的进程
if (failed) {
this.cancelAcquire(node);
}
}
}
上述方法与 AbstractQueuedSynchronizer#acquire 的实现逻辑大同小异,区别在于线程在被封装成结点之后,是以共享(SHARED)模式在同步队列中进行等待。这里我们重点关注一下 AbstractQueuedSynchronizer#setHeadAndPropagate 方法的实现,当结点上的线程成功获取到资源会触发执行该方法,以尝试唤醒后继结点。实现如下:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录之前的头结点
this.setHead(node); // 头结点一般记录持有资源的线程结点
/*
* 如果满足以下条件,尝试唤醒后继结点:
*
* 1. 存在剩余可用的资源;
* 2. 后继结点处于等待状态,或后继结点为空
*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause unnecessary wake-ups,
* but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
*/
if (propagate > 0 // 存在剩余可用的资源
|| h == null || h.waitStatus < 0 // 此时 h 是之前的头结点
|| (h = head) == null || h.waitStatus < 0) { // 此时 h 已经更新为当前头结点
Node s = node.next;
// 如果后继结点以共享模式在等待,或者后继结点未知,则尝试唤醒后继结点
if (s == null || s.isShared()) {
this.doReleaseShared();
}
}
}
因为当前结点已经获取到资源,所以需要将当前结点记录到头结点中。此外,如果满足以下 2 种情况之一,还需要唤醒后继结点:
参数 propagate > 0,即存在可用的剩余资源;前任头结点或当前头结点不存在,或指明后继结点需要被唤醒。如果满足上述条件之一,且后继结点状态未知或以共享模式在等待,则调用 AbstractQueuedSynchronizer#doReleaseShared 方法唤醒后继结点,关于该方法的实现留到下一小节进行分析。
针对共享模式释放资源,AbstractQueuedSynchronizer 同样定义了单一实现,即 AbstractQueuedSynchronizer#releaseShared 方法,该方法本质上也是一个调度的过程,具体释放资源的操作交由 tryReleaseShared 方法完成,由子类实现。方法 AbstractQueuedSynchronizer#releaseShared 实现如下:
public final boolean releaseShared(int arg) {
// 尝试释放资源
if (this.tryReleaseShared(arg)) {
// 释放资源成功,唤醒后继结点
this.doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other in-progress acquires/releases.
* This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal.
* But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added while we are doing this.
* Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
*/
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头结点状态为 SIGNAL,则在唤醒后继结点之前尝试清除当前结点的状态
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
// loop to recheck cases
continue;
}
// 唤醒后继结点
this.unparkSuccessor(h);
}
/*
* 如果后继结点暂时不需要被唤醒,则基于 CAS 尝试将目标结点的 waitStatus 由 0 修改为 PROPAGATE,
* 以保证后续由唤醒通知到来时,能够将通知传递下去
*/
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// loop on failed CAS
continue;
}
}
// 如果头结点未变更,则说明期间持有锁的线程未发生变化,能够走到这一步说明前面的操作已经成功完成
if (h == head) {
break;
}
// 如果头结点发生变更,则说明期间持有锁的线程发生了变化,需要重试以保证唤醒动作的成功执行
}
}
如果释放资源成功,需要依据头结点当下等待状态分别处理:
如果头结点的等待状态为 SIGNAL,则表明后继结点需要被唤醒,在执行唤醒操作之前需要清除等待状态。如果头结点状态为 0,则表示后继结点不需要被唤醒,此时需要将结点状态修改为 PROPAGATE,以保证后续接收到唤醒通知时能够将通知传递下去。
本文我们分析了 AQS 的设计与实现。理解了 AQS 的运行机制也就理解了 java 的 Lock 锁是如何实现线程的阻塞、唤醒、等待和通知机制的,所以理解 AQS 也是我们后面分析 Lock 锁和同步器实现的基础。
下一篇分享AQS的实现组件 ReentrantLock,敬请期待。
来源:http://yli11.cn/XBmj9
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8