AbstractQueuedSynchronizer提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作:
java.util.concurrent.locks.AbstractQueuedSynchronizer.getState() java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int) java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)
子类推荐被定义为自定义同步装置的内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干acquire之类的方法来供使用。该同步器即可以作为排他模式也可以作为共享模式,当它被定义为一个排他模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。
同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。 可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。 同步器的开始提到了其实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:
Node { int waitStatus; Node prev; Node next; Node nextWaiter; Thread thread; }
以上五个成员变量主要负责保存该节点的线程引用,同步等待队列(以下简称sync队列)的前驱和后继节点,同时也包括了同步状态。
节点成为sync队列和condition队列构建的基础,在同步器中就包含了sync队列。同步器拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。
实现自定义同步器时,需要使用同步器提供的getState()、setState()和compareAndSetState()方法来操纵状态的变迁。
实现这些方法必须是非阻塞而且是线程安全的,推荐使用该同步器的父类java.util.concurrent.locks.AbstractOwnableSynchronizer来设置当前的线程。 开始提到同步器内部基于一个FIFO队列,对于一个独占锁的获取和释放有以下伪码可以表示。 获取一个排他锁。
while(获取锁) { if (获取到) { 退出while循环 } else { if(当前线程没有入队列) { 那么入队列 } 阻塞当前线程 } } 释放一个排他锁。
if (释放成功) { 删除头结点 激活原头结点的后继节点 }
下面通过一个排它锁的例子来深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能够更加深入了解其他的并发组件。 排他锁的实现,一次只能一个线程获取到锁。
class Mutex implements Lock, java.io.Serializable { // 内部类,自定义同步器 private static class Sync extends AbstractQueuedSynchronizer { // 是否处于占用状态 protected boolean isHeldExclusively() { return getState() == 1; } // 当状态为0的时候获取锁 public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 释放锁,将状态设置为0 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // 返回一个Condition,每个condition都包含了一个condition队列 Condition newCondition() { return new ConditionObject(); } } // 仅需要将操作代理到Sync上即可 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
可以看到Mutex将Lock接口均代理给了同步器的实现。 使用方将Mutex构造出来之后,调用lock获取锁,调用unlock进行解锁。下面以Mutex为例子,详细分析以下同步器的实现逻辑。
实现分析 public final void acquire(int arg) 该方法以排他的方式获取锁,对中断不敏感,完成synchronized语义。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述逻辑主要包括:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 快速尝试在尾部添加 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
先行尝试在队尾添加;
如果尾节点已经有了,然后做如下操作:
如果队尾添加失败或者是第一个入队的节点。
如果是第1个节点,也就是sync队列没有初始化,那么会进入到enq这个方法,进入的线程可能有多个,或者说在addWaiter中没有成功入队的线程都将进入enq这个方法。
可以看到enq的逻辑是确保进入的Node都会有机会顺序的添加到sync队列中,而加入的步骤如下:
进入sync队列之后,接下来就是要进行锁的获取,或者说是访问控制了,只有一个线程能够在同一时刻继续的运行,而其他的进入等待状态。而每个线程都是一个独立的个体,它们自省的观察,当条件满足的时候(自己的前驱是头结点并且原子性的获取了状态),那么这个线程能够继续运行。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head &&tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
如上图所示,其中的判定退出队列的条件,判定条件是否满足和休眠当前线程就是完成了自旋spin的过程。
public final boolean release(int arg) 在unlock方法的实现中,使用了同步器的release方法。相对于在之前的acquire方法中可以得出调用acquire,保证能够获取到锁(成功获取状态),而release则表示将状态设置回去,也就是将资源释放,或者说将锁释放。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { // 将状态设置为同步状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 获取当前节点的后继节点,如果满足状态,那么进行唤醒操作 * 如果没有满足状态,从尾部开始找寻符合要求的节点并将其唤醒 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
上述逻辑主要包括,该方法取出了当前节点的next引用,然后对其线程(Node)进行了唤醒,这时就只有一个或合理个数的线程被唤醒,被唤醒的线程继续进行对资源的获取与争夺。 回顾整个资源的获取和释放过程: 在获取时,维护了一个sync队列,每个节点都是一个线程在进行自旋,而依据就是自己是否是首节点的后继并且能够获取资源; 在释放时,仅仅需要将资源还回去,然后通知一下后继节点并将其唤醒。 这里需要注意,队列的维护(首节点的更换)是依靠消费者(获取时)来完成的,也就是说在满足了自旋退出的条件时的一刻,这个节点就会被设置成为首节点。
protected boolean tryAcquire(int arg) tryAcquire是自定义同步器需要实现的方法,也就是自定义同步器非阻塞原子化的获取状态,如果锁该方法一般用于Lock的tryLock实现中,这个特性是synchronized无法提供的。
public final void acquireInterruptibly(int arg) 该方法提供获取状态能力,当然在无法获取状态的情况下会进入sync队列进行排队,这类似acquire,但是和acquire不同的地方在于它能够在外界对当前线程进行中断的时候提前结束获取状态的操作,换句话说,就是在类似synchronized获取锁时,外界能够对当前线程进行中断,并且获取锁的这个操作能够响应中断并提前返回。一个线程处于synchronized块中或者进行同步I/O操作时,对该线程进行中断操作,这时该线程的中断标识位被设置为true,但是线程依旧继续运行。 如果在获取一个通过网络交互实现的锁时,这个锁资源突然进行了销毁,那么使用acquireInterruptibly的获取方式就能够让该时刻尝试获取锁的线程提前返回。而同步器的这个特性被实现Lock接口中的lockInterruptibly方法。根据Lock的语义,在被中断时,lockInterruptibly将会抛出InterruptedException来告知使用者。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 检测中断标志位 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException 该方法提供了具备有超时功能的获取状态的调用,如果在指定的nanosTimeout内没有获取到状态,那么返回false,反之返回true。可以将该方法看做acquireInterruptibly的升级版,也就是在判断是否被中断的基础上增加了超时控制。 针对超时控制这部分的实现,主要需要计算出睡眠的delta,也就是间隔值。间隔可以表示为nanosTimeout = 原有nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。如果nanosTimeout大于0,那么还需要使当前线程睡眠,反之则返回false。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head &&tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); //计算时间,当前时间减去睡眠之前的时间得到睡眠的时间,然后被 //原有超时时间减去,得到了还应该睡眠的时间 nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上述这个图中可以理解为在类似获取状态需要排队的基础上增加了一个超时控制的逻辑。每次超时的时间就是当前超时剩余的时间减去睡眠的时间,而在这个超时时间的基础上进行了判断,如果大于0那么继续睡眠(等待),可以看出这个超时版本的获取状态只是一个近似超时的获取状态,因此任何含有超时的调用基本结果就是近似于给定超时。
public final void acquireShared(int arg) 调用该方法能够以共享模式获取状态,共享模式和之前的独占模式有所区别。以文件的查看为例,如果一个程序在对其进行读取操作,那么这一时刻,对这个文件的写操作就被阻塞,相反,这一时刻另一个程序对其进行同样的读操作是可以进行的。如果一个程序在对其进行写操作,那么所有的读与写操作在这一时刻就被阻塞,直到这个程序完成写操作。 以读写场景为例,描述共享和独占的访问模式,如下图所示:
上图中,红色代表被阻塞,绿色代表可以通过。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上图中,绿色表示共享节点,它们之间的通知和唤醒操作是在前驱节点获取状态时就进行的,红色表示独占节点,它的被唤醒必须取决于前驱节点的释放,也就是release操作,可以看出来图中的独占节点如果要运行,必须等待前面的共享节点均释放了状态才可以。而独占节点如果获取了状态,那么后续的独占式获取和共享式获取均被阻塞。
public final boolean releaseShared(int arg) 调用该方法释放共享状态,每次获取共享状态acquireShared都会操作状态,同样在共享锁释放的时候,也需要将状态释放。比如说,一个限定一定数量访问的同步工具,每次获取都是共享的,但是如果超过了一定的数量,将会阻塞后续的获取操作,只有当之前获取的消费者将状态释放才可以使阻塞的获取操作得以运行。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
上述逻辑主要就是调用同步器的tryReleaseShared方法来释放状态,并同时在doReleaseShared方法中唤醒其后继节点。
在上述对同步器AbstractQueuedSynchronizer进行了实现层面的分析之后,我们通过一个例子来加深对同步器的理解: 设计一个同步工具,该工具在同一时刻,只能有两个线程能够并行访问,超过限制的其他线程进入阻塞状态。 对于这个需求,可以利用同步器完成一个这样的设定,定义一个初始状态,为2,一个线程进行获取那么减1,一个线程释放那么加1,状态正确的范围在[0,1,2]三个之间,当在0时,代表再有新的线程对资源进行获取时只能进入阻塞状态(注意在任何时候进行状态变更的时候均需要以CAS作为原子性保障)。由于资源的数量多于1个,同时可以有两个线程占有资源,因此需要实现tryAcquireShared和tryReleaseShared方法,这里谢谢luoyuyou和同事小明指正,已经修改了实现。
public class TwinsLock implements Lock { private final Sync sync = new Sync(2); private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7889272986162341211L; Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); } public int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } } public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } public void unlock() { sync.releaseShared(1); } @Override public Condition newCondition() { return null; } }
上述测试用例的逻辑主要包括:
AQS简核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:
共享变量的修改都是通过Unsafe类提供的CAS操作完成的。 AbstractQueuedSynchronizer类的主要方法是acquire和release,典型的模板方法, 下面这4个方法由子类去实现:
protected boolean tryAcquire(int arg) protected boolean tryRelease(int arg) protected int tryAcquireShared(int arg) protected boolean tryReleaseShared(int arg)
acquire方法用来获取锁,返回true说明线程获取成功继续执行,一旦返回false则线程加入到等待队列中,等待被唤醒,release方法用来释放锁。 一般来说实现的时候这两个方法被封装为lock和unlock方法。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8