阻塞队列yyds

346次阅读  |  发布于3年以前

阻塞队列简介

大家好, 距离上一篇原创又有一个多星期来,这次给大家带来阻塞队列的深入分析。

对于阻塞队列在上一篇线程池的分析中有分析到,大家可以参考一下:[线程池全面分析] 。

那什么是阻塞队列呢?按照我个人的理解就是,队列他本身是一个容器,容器就是来装元素的,那么装元素就会有放元素和取元素的两个操作,而这个阻塞两个字就是体现在放元素和取元素两个操作;队列的另一个特性就是FIFO(first in first out)

所以用一句话来概括阻塞队列就是:是一个支持阻塞式的插入和移除的FIFO队列

  1. 插入元素的时候:当队列满了,线程阻塞等待,直到队列中的元素被取出,然后被唤醒继续往队列里面插入元素
  2. 获取元素的时候:队列为空,线程阻塞等待,直到队列有元素,被唤醒

那一定是阻塞吗?不一定,上面描述是支持,那么可以非阻塞吗,答案是可以的,它也可以支持当队列不满足插入或者移除元素的条件,会立即返回一个特殊值,表示这个操作是否成功,比如true和false。

一般阻塞队列比较常用于生产者与-消费者模式。比如:我上一篇写的线程池的文章中就有分析到。

阻塞队列充当生产者与-消费者模式的容器,生产者往阻塞队列里面添加数据,消费者从队列中获取数据进行消费。

那么,在Java中常用的几种阻塞队列有哪些呢?下面我们来详细的聊一聊具体阻塞队列有哪些?

阻塞队列种类

先放一张阻塞队列的继承关系图:

最顶层的是Iterable,接下来的接口是Collection,对于Collection,我们对于它的印象更多的是集合的父接口,因为在Collection里面更多定义的是我们所熟悉的集合方法:

第三层就是Queue接口,在这个接口里面定义了队列的插入和移除的基本方法:

所以,要研究阻塞队列可以研究Queue以下的接口以及实现类就行了,在Queue接口下面就是BlockingQueue子接口,在这个子接口里面扩展了更多的对于阻塞队列的操作的方法,具体如下:

最下面的就是具体的阻塞队列的实现类,一共有七个:

  1. ArrayBlockingQueue:是一个由数组构成的有界队列,初始化的时候一定要传入队列的初始容量大小,底层用于存储的元素的结构为:Object[] items
  2. LinkedBlockingQueue:是一个又链表组成的无界队列,默认值的队列容量的大小是:Integer.MAX_VALUE,这个值很大,所以默认认为无界,它也可以在初始化的时候,传入队列的容量,指定队列容量的大小。
  3. PriorityBlockingQueue:是一个支持优先级的有界阻塞队列,默认的队列大小是11DEFAULT_INITIAL_CAPACITY),也可以在初始化队列的时候传入队列的大小,它的优先级在初始化的时候通过构造函数传入一个Comparator接口的来实现:
  4. DelayQueue: 是一个支持延迟获取元素的有界队列,底层使用PriorityQueue是实现,初始化的容量大小为11: 像类似于延迟的场景,都可以使用它来实现。比如:定时任务的调度、简单的实现订单超时状态的取消(消息队列的延时队列)。
  5. SynchronousQueue: 是一个不支持存储元素的阻塞队列,一个put操作就对应一个take操作,假如对用的操作还没执行,就会进行阻塞。
  6. LinkedTransferQueue: 是一个由链表组成的无界阻塞队列,它有两个方法比较有意思transfer和tryTransfer。transfer是将生产者的传入的数据立即传给消费者,假如没有消费者等待接收数据,就会把数据放入队列的尾节点;tryTransfer方法是尝试将生产者传入的数据传给消费者,若是没有消费者等待接收数据,就会返回false。还提供了基于tryTransfer方法的超时方法。
  7. LinkedBlockingDeque: 是一个由链表构成的双端阻塞队列,支持双向的添加和移除操作(addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法),相对于单向的队列来说,双端队列能够减少一半的竞争

对于阻塞队列主要研究的就是元素怎么进入队列里面,上面的BlockingQueue接口里面对于插入和移除都有好几个方法,那么这个方法又有什么不同呢?可以参考下面的表格:

那么对于阻塞队列,它又是怎么实现上面的特点的呢?下面我们通过源码分析的方式进行深入的剖析。

阻塞队列实现原理

这里以ArrayBlockingQueue的源码进行深入的分析,其他几个的源码可以结合相关资料自行了解。

先来看看ArrayBlockingQueue的add操作,add操作是队列满的时候抛出以异常,源码实现挺简单的:

ArrayBlockingQueue的add操作是直接调用父类AbstractQueue的add操作:

add操作就是通过调用offer操作返回的特殊值进行判断实现,来看看offer的实现:

offer方法实现主要干了以下四件事:

  1. ReentrantLock加锁,避免多线程的时候插入操作,出现数据的混乱。
  2. 判断若是队列满了就直接返回false,表示插入不成功。
  3. 最后则进行插入enqueue(e)。
  4. 解锁

enqueue方法是真正实现插入元素的动作,具体的源码实现如下:

上面我们聊到在ArrayBlockingQueue的源码实现里面使用Object[] items来存储元素,也就是上面源码实现的this.items,直接通过**items[putIndex] = x;**的方式将元素插入队列里面。

然后再判断putIndex是否已经达到队列的长度,若是已经达到队列长度就重新赋值为0。

最后记录count++ 也就是队列中元素的个数。这里有个比较有意思的就是notEmpty.signal() 方法,与它对应就是:notEmpty.await()

notEmpty是一个Condition类型的元素,对于Condition这里制作大致的介绍,后面的原创会详细的介绍。

在ArrayBlockingQueue源码实现里面有等待条件队列,分别是:notEmptynotFull

这两个的作用分别是:当向队列里面添加元素的时候或者向队列获取元素,队列已经满了或者队列为空,notEmpty可以在队列为空时,调用notEmpty.await(),阻塞线程

当向队列里面添加了元素时,就会调用notEmpty.signal(),通知因为队列为空而等待的线程:

notFull则是在队列满的时候,实现线程的通信,当向队列中添加元素的时候,队列满了,实现线程等待;或者从队列中获取了元素,则通知等待的线程往可以向队列中插入元素。

所以总的一句话来概括Condition的作用就是:实现线程之间的等待/通知机制的通信,其实等待通知机制我们直到在Object方法里面也有对应的wait/notify方法实现,具体两者的区别可以自己深入去了解,这一篇暂不做深入分析。

我们来看看Condition又是怎么实现等待/通知机制的呢?分别来看看await/signal的源码实现,signal跟踪到底层源码的实现,最终实现的重要方法是:LockSupport.unpark(node.thread)

LockSupport.unpark(node.thread) 这个就是真正的实现唤醒线程的操作,那么对应的await的方法,肯定是就是:LockSupport.park() 方法。再往下面研究就是native方法了:

所以Condition是通过LockSupport.park与LockSupport.unpark实现线程的等待通知机制。

研究过AQS的源码的人会知道,在AQS里面也会有Condition,因为AQS也有独占式锁,阻塞式的获取锁资源。

所以,到这里大家应该知道Condition的作用了吧,就是实现线程的等待与通知,可以用来实现阻塞式的资源获取。

我们来看看看put方法,阻塞式的:

从它的源码里面就能够显而易见的发现:当队列满的时候,调用notFull.await(),阻塞等待,直到线程被中断,或者有另外的线程唤醒notFull.signal()。这个方法比较简单,不过对于理解线程之间的通信,实现阻塞式的等待还是非常有帮助的,顺便给大家一张图:

图片来源于百度

最后来看看offer实现超时退出,具体源码如下:

具体的实现,还是调用notFull.awaitNanos(nanos),进行超时等待,而它的底层是调用LockSupport.parkNanos(this, nanosTimeout),最终还是回到了这个LockSupport类:

在超时退出方法offer里面有个有意思的就是这个lock.lockInterruptibly(),在ReentrantLock获取锁的方法有好几个。

分别有lock、tryLock、lockInterruptibly等,他们的区别就是lock是是阻塞的,tryLock是非阻塞(返回特殊值)、lockInterruptibly是阻塞可中断的

在lock的源码中作者也有解释到:如果锁由另一个线程持有,则当前线程出于线程调度目的将被禁用,并处于休眠状态,直到获得锁为止,此时锁持有计数设置为1

所以lockInterruptibly方法在跳出,有以下几种可能:

  1. 如果获取锁成功,方法结束。
  2. 如果锁无法获取,当前线程被阻塞,直到下面情况发生:
  3. 当前线程(被唤醒后)成功获取锁
  4. 当前线程被其他线程中断

lockInterruptibly底层源码跟踪后也是通过**LockSupport.park(this)**方法来实现线程的阻塞:

所以很多并发的工具类底层实现都基本差不多(Condition、LockSupport),因为这些都是通用的,应用层的工具类都是通过各种已有的工具类堆叠起来的。

最后分享一波关于JVM知识总结的精美图片给你们,让你们卷一下,哈哈哈

好了今天的阻塞队列的知识就先分享到这里,我们下一期见 ,感谢大家!!!最后提前祝大家国庆快乐!!!

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8