AQS回答让面试官唱征服

385次阅读  |  发布于3年以前

开场

上海某核心商务楼里,正发生着一起求职者和面试官的较量。

面试官:你先自我介绍一下。

安琪拉:面试官你好,我是草丛三婊,最强中单(妲己不服),草地摩托车车手,第21套广播体操推广者,火的传人安琪拉,这是我的简历,可能有点沉,您拿好了,三十多页。

面试官:看你简历上写熟悉并发编程,熟悉到什么程度?

安琪拉:精通。对。。。问就是“精通”,头铁。

面试官:既然你这么硬,那我们省去中间环节,直接问底层原理吧。

安琪拉:【怂了】等,等,等等,还是先铺垫一下比较好,太硬了面试时间短。

面试官:那行吧。我们开始哈,java.util.concurrent 包熟悉吗?都用过哪些?

安琪拉:熟悉啊,你随便问,什么 ConcurrentMap、Lock、Reentrantlock、Semaphore、CountDownLatch、CyclicBarrier、Executors、CompletableFuture、BlockingQueue 门清。【来之前把安琪拉的博客上关于这一块的文章都背了。】

面试官:Reentrantlock、Semaphore、CountDownLatch、CyclicBarrier 这几个平常开发都用吗?

安琪拉:有的用过,比如Reentrantlock 和CountDownLatch。

面试官:那你跟我讲讲 Reentrantlock 在你们项目中的用法吧。

安琪拉:我们项目属于保密项目,无可奉告,你还是换个问题吧!

面试官:那说个不保密的项目,或者你直接告诉我 Reentrantlock 的实现原理吧。

正题

安琪拉:show time。。。【亲爱的面试官欸,来music】

Reentrantlock 底层是通过AbstractQueuedSynchronizer (简称AQS)来实现的。另外AQS的应用场景很广,CountDownLatch、ThreadPoolExecutor、Semaphore、ReentrantReadWriteLock、CyclicBarrier 都有用到。

面试官:讲个实际的例子吧。

就拿我们比较常见的 Reentrantlock 说吧。你先讲讲 Reentrantlock 的加锁的过程?

安琪拉:【这个时候一定不要急着回答面试官问题,做沉思状,不然太像是来之前背过题】首先来这么一句:你想问的是公平锁还是非公平锁?诶,这么一问显得你注重细节。

面试官:【心里估计想,还会审题,可以可以】那你先说下公平锁的加锁流程

安琪拉:【阿,该死,我可是先背的非公平锁,再背的公平锁和非公平锁的区别,书上没教过怎么先说公平锁啊。。还好安琪拉教过】

ojbk。公平锁的加锁流程,笔递给我一下,画个图吧。

【画图的时候一定不要太快,不要一步到位,先画主要流程,再画细节和分支流程,面试官一定会被你的图征服,毕竟图画的这么清晰,代码写的能差到哪去。。。】

学美术的那位,没说你,你坐下。

亲爱的面试官,我对着图讲一下:

  1. 线程调用 Reentrantlock.lock 加锁,首先判断当前锁的状态,如果是无锁状态,会看有没有其他线程在等锁,如果没有,那就设置当前线程为持有锁的线程,修改锁状态为获取成功状态;
  2. 接着上一步,还是无锁状态,但是有其他线程排在你前面等获取锁,那你不能插队,乖乖去后面排队等着获取锁;
  3. 再说有锁的情况,如果当前是有锁的状态,判断当前持有锁的线程是否是当前线程,毕竟Reentrantlock 是可重入的,重入的意思就是同一个线程可以获取同一把锁,如果恰好锁是当前线程拿着的,那就继续拿着好了呗,重入次数+1次;
  4. 如果有锁,但不是自己持有的,等于获取锁失败,也是去排队。

所以总结下来就是只有无锁并且没有其他线程在排队等锁,或者有锁并且是自己持有的锁,才能获取锁成功。

面试官:【有点东西,莫急,等我打开手机看看安琪拉文章下面是怎么问的】哦,对了,那公平锁和非公平锁区别是啥,公平锁的公平体现在哪?

安琪拉:真诚的看着面试官,此时此刻我想吟诗一首,打个比方,比如面试官是狗,不好意思,说错了,线程是一条狗,小冬这条狗想吃狗粮的时候,如果他是一只善良的狗(公平狗),他会先看是不是有其他狗在排队,如果有它会老老实实排在队尾,如果他是一只恶狗(非公平狗),不管有没有其他狗排队,会直接会去占着狗盘(这里是尝试占,不一定成功)。

面试官:【小伙子有点东西,第一次见人把公平锁说的如此清新脱俗,狗这个例子我好喜欢,不过现在是我的了,以后出去面试我就这么讲】

说了半天了,你写段代码给我看看,简单的用 Reentrantlock 实现加锁解锁。

安琪拉

static int count = 0;
//传入true 代表创建公平锁
static ReentrantLock lock = new ReentrantLock(true);

public static void main(String[] args) throws InterruptedException {
  lock.lock();
  try {
    //操作共享变量
    count++;
  } finally {
    lock.unlock();
  }
  System.out.println("释放锁结束");
}

我们从 lock.lock() 开始,进入到 Reentrantlock 内部,Reentrantlock 实现了 Lock 接口

public class ReentrantLock implements Lock, java.io.Serializable {
  //final修饰,初始化后不能修改
  private final Sync sync;

  //自定义公平或非公平锁
  public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
  }
  public void lock() {
    //FairSync.lock()
    sync.lock();
  }
}

可以看到 lock() 函数很简单,就是直接调用 sync 的 lock()方法,那 sync 就是根据初始化 ReentrantLock 的时候确定的是公平还是非公平的。

面试官:那代码都写到这份上了,再继续写呗。最好跟我讲清楚详细源码的实现?

安琪拉:【我怀疑你不会,想让我教你,肯定是想让我教你怎么看源码然后进大厂,这什么世道,看了我来错地方了,虎落平阳被犬欺,哎,没办法,谁让你是面试官呢!!】

那好,我讲讲具体怎么实现。

我们顺着上面的代码往下看,我们可以看到 ReentrantLock的加锁依赖的是 FairSync/NonfairSync的 lock() 函数。

无论是公平锁还是非公平锁,都是通过调用 acquire() 方法实现,而 acquire方法 是 FairSync 和 UnfairSync 的父类AbstractQueuedSynchronizer(AQS)中的核心方法。

static final class FairSync extends Sync {
  private static final long serialVersionUID = -3000897897090466540L;
 //直接调用AQS acquire 方法
  final void lock() {
    acquire(1);
  }
}
//AQS
public abstract class AbstractQueuedSynchronizer {
  //获取锁
  public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
  }
}

FairSync (公平锁)实现直接调用的 AQS 的 acquire 方法,传入值为1,代表获取锁数量(可重入)。

面试官:【这小子还挺经得住面,再听他讲讲源码】

再来,继续,讲讲 尝试获取的详细流程,比如:锁的状态怎么设计,怎么看当前谁持有锁?

安琪拉:【真要徒手撸代码谁会,Doug Lea 现场写估计也够呛】

帅面试官,你好,容我掏下手机给您看个东西,这是安琪拉的博客公众号上面的源码,我对着源码给你讲一讲

大概的流程我前面画了图,这里讲实现了:

先说二个知识点:

对照上面的流程:

  1. 获取当前的线程 final Thread current = Thread.currentThread();
  2. 获取当前锁状态 int c = getState(), state 是volatile 的,保证可见性,修改的时候通过CAS保证原子性,volatile + CAS 组合实现了安全的共享变量的修改;
  3. 如果state = 0,代表无锁状态,按照之前说的,判断队列是否有其他线程(狗的例子):hasQueuedPredecessors(),如果没有其他线程,CAS 获取锁成功,设置当前线程为 exclusiveOwnerThread;
  4. 如果state != 0,代表有其他线程持有锁,判断持有锁的线程是不是当前希望竞争锁的线程,如果是,修改state 就好了,表示多次重入。
  5. 其他情况都是获取锁失败。

面试官:那获取锁失败,AQS 是如何处理的呢?是不是就直接退出获取锁的流程?

安琪拉:不是。如果竞争锁失败,直接退出会大大降低并发度,哪能失败一次就放弃,应该继续竞争啊。

面试官:【说的好像我啊,上次她拒绝了我,我就放弃了,不行,我应该像AQS 里的线程一样,安静的等待,等女神唤醒我,她现在拒绝我是因为她还没发现我的好,对,就是这样】

安琪拉:【面试官深入回忆中】喂喂喂,面试官,还有问题吗?

面试官:【哦哦哦,刚回过神来】问到哪了?那你讲讲AQS 为什么要让竞争失败的线程等待,以及怎么让竞争失败线程等待的?

安琪拉:这个问题是个好问题,我喜欢面试官问我为什么?而不是上来一顿,,,是什么。

先说为什么要让竞争失败的线程等待?

大部分并发场景下,线程出现锁竞争的几率很低。

一般线程在获取锁之后,操作的时间很短,很快就会释放锁。

如果线程加锁刚好遇到竞争(有其他线程持有锁),这个时候如果直接放弃等待,会大大降低系统的并发度,所以应该设计一种让线程可以等待锁的过程。

再说AQS如何做到让竞争失败线程等待的。

是这样子的,如果线程获取锁失败,AQS 就让把它丢到一个等待队列中,这个队列叫CLH,是用双向链表来实现的,当线程释放锁的时候,AQS会唤醒线程所在节点的下一个节点的线程。

面试官:能具体讲讲吗?最好能给我画个图,来,就在我电脑上画吧。draw.io 软件已经给您备好了

安琪拉:【这么尊重,称呼都变了,都用上您了,肯定又想白嫖】图可以画,你先关注一下这个公众号。

基本上一张图把事情大概都说清楚了。

一步步来,回到 AQS 的acquire 方法,tryAcquire 方法获取锁失败,就走到了 acquireQueued 方法,在队列等待获取锁的流程。

//AQS
public abstract class AbstractQueuedSynchronizer {
  //获取锁
  public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
  }
}

既然是在队列等待获取锁,先要把线程放到队列中。addWaiter 就是干这个的,把当前线程放入队列。

addWaiter 这里其实就到了AQS 的核心机制了。

面试官:说到AQS核心机制,AQS的 核心作用是什么?主要解决的什么问题?

安琪拉:AQS 的核心作用是:它设计了一套比较通用的线程阻塞等待唤醒的机制,无锁状态加锁,有锁状态将线程放在等待队列排队获取锁。这种有效等待,相比于其他死等待或休眠机制、一方面减少了CPU空耗,另一方面机制能力很强,可以满足非常多并发控制的场景。

这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

面试官:那你说说AQS 的线程排队获取锁的机制吧?

安琪拉:第一步,我们先来看看线程尝试获取锁失败,怎么放入等待队列的。

private Node addWaiter(Node mode) {
  //初始化节点
  Node node = new Node(Thread.currentThread(), mode);
  //第一步,尝试快速插入CHL队列
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  //如果快速插入失败,走enq插入
  enq(node);
  return node;
}

为了便于面试官您快速理解,我把第一步插入的过程画了个图,这个典型的数据结构中在双向链表尾部添加节点的过程。

这里有个知识点着重说明一下,在CHL 队列里面,头结点是虚节点,不存储数据的。

那如果tail 节点为空,或者 CAS设置 tail 节点失败(存在其他线程入队列),该怎么办呢?

看 enq 方法的实现:

//入队
private Node enq(final Node node) {
  //循环来
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

enq 方法非常简单暴力,就是把上面节点入队列那套流程丢到 for 死循环里,直到插入成功为止,当然如果是没初始化,多了个初始化的流程。

初始化的是个虚节点,compareAndSetHead(new Node()), head 直接设置的一个不存东西的节点。

面试官:入队成功之后怎么开始排队等待的呢?

安琪拉:到了非常关键的地方了,也算是整个 AQS 最核心的模块。

这里先交代一下Node 的属性:

//等待状态
volatile int waitStatus;
//前驱
volatile Node prev;
//后继
volatile Node next;
//等待线程
volatile Thread thread;
//指向下一个处于CONDITION状态的节点
Node nextWaiter;

waitStatus 非常重要,也是关键,有下面几个枚举值:

枚举 含义
CANCELLED 为1,表示线程获取锁的请求已经取消了
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用
0 当一个Node被初始化的时候的默认值

前面已经把线程放入等待队列,之后,开始在队列里面排队等待获取锁,代码如下:

前面讲线程已经放到等待队列了,队列里面的每个线程都会执行 acquireQueued 这套逻辑,逻辑得流程图在上面流程B中。

面试官:这里为什么是判断 node 节点的前驱节点是否是head 头结点来确认是否应该调 tryAcquire 获取锁?

安琪拉:CLF 是FIFO 先进先出的原则,让队列前面的优先获取锁。那什么时候时候轮到当前线程获得锁呢,那就是队列前面没有其他线程节点了,入下图,只有当前节点前面是head 节点,才是排到了队列最前面。我们前面说过 head节点是虚节点,不存储真实有效信息。

面试官:那获取锁成功之后不是应该把当前节点直接remove移除掉,为什么是把 head 指向成当前节点,这个是什么操作呢?

安琪拉:当前节点获取锁成功(不再需要继续for 循环或阻塞等待锁),可以开始执行后续的业务逻辑了。这时候不是将当前节点从队列移除,而是设置为头结点,二方面原因:

面试官:那当前节点如果一直获取不到锁,难道一直for 循环吗?

安琪拉:当然不是,一直for 循环太消耗CPU 性能了,获取锁失败后会进入阻塞状态。我们来看下 shouldParkAfterFailedAcquire 的代码。

获取锁失败的场景:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    //前驱节点是SIGNAL这个状态代表,如果当前线程释放了持有的资源或者被取消,需要唤醒后继结点上的线程。
    //返回true是指当前节点的线程可以安心的阻塞,因为前驱节点会唤醒你
    return true;
  if (ws > 0) {
    // waitStatus > 0, 只有 CANCELLED 状态 =1 ,代表节点已经取消获取锁,把这些取消的节点从队列里面移除
    // 这里的操作是非常常规的链表将当前节点的前驱指向前驱的前驱,
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    // waitStatus 可能是 0 或者 PROPAGATE,表示我们需要一个 SIGNAL状态,还不能直接阻塞,
    // false返回for循环还能尝试获取一下锁
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

如果尝试获取锁失败之后,会调用 shouldParkAfterFailedAcquire 判断是否应该阻塞。

是否应该阻塞的条件是根据前一个节点的waitStatus,

面试官:如果线程一直获取不到锁,那岂不是一直 for 循环抢占锁吗?

安琪拉:for 循环看到前节点 waitStatus = SIGNAL,会调用 parkAndCheckInterrupt

代码如下:

private final boolean parkAndCheckInterrupt() {
  //阻塞当前节点线程
  LockSupport.park(this);
  //线程从这个地方恢复之后开始执行,返回中断状态(同时清除中断状态)
  return Thread.interrupted();
}

线程调用 LockSupport.park 后会阻塞,等前面节点释放锁之后,状态变为SIGNAL 之后会唤起当前节点。

面试官:什么时候线程会释放锁,唤起队列中阻塞的线程呢?

安琪拉:线程取消获取锁(比如超时),或者执行完成之后释放锁。

继续看代码:

public final boolean release(int arg) {
  //tryRelease 实现在Reentrantlock 类中
  if (tryRelease(arg)) {
    Node h = head;
    //头节点不为空,并且等待状态不为0,唤醒后续节点
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

面试官:能继续深入的讲讲吗?

安琪拉:给后面的面试官一点机会,你都问了,他问什么?好不

面试官:那下次再约吧。你有什么问题想问我的吗?

安琪拉:你会唱 “征服” 吗 ?

面试官:。。。。

【未完待续】

本文以 Reentrantlock 为出发点,讲解了 AQS 的核心CHL 逻辑, 还只讲了皮毛,解锁,取消获取锁都没讲,因为篇幅原因,也只介绍了独占锁和同步队列,共享锁和条件队列也没讲,打算后面用3 ~ 4篇文章把AQS 讲清楚。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8