Redis内部提供了一个非常实用的数据结构,这款数据结构就是List。
List是一款简单的字符串列表,这类字符串列表所能存储的元素个数上限为(2^32 -1 )个Redis在底层对它提供了非常丰富的API接口供外界调用,从而能实现不同的效果。
环境准备
首先我们需要拉取docker的相关镜像文件到本地机器:
docker pull redis:4.0
然后便是启动这个容器:(启动的时候需要指定相关的镜像文件和对应镜像版本号)
docker run -p 6379:6379 -d --name myredis redis:4.0
于是容器启动后便ok了:
好了,现在我们有了一个完整的redis实操环境 下边我们先从应用方面开始着手List这款数据结构。
队列结构
如果使用了list结构的时候,我们是从一端插入,从另外一端弹出,那么此时所处理的一个数据结构就会是一个队列的模式,例如说:
lpush list a
lpush list b
lpush list c
那么这个时候,数据结构list当中的元素从左到右依次排序的信息就会是:
c,b,a
这个时候如果我们从数据结构list队列中将元素依次取出
rpop list
rpop list
rpop list
那么在每次执行redis指令之后,它所弹出的元素就会是:
a,b,c
栈结构
如果我们使用了list结构,并且每次操作都是在同一端去做处理,例如说:
lpush list a
lpush list b
lpush list c
那么这个时候,数据结构list的元素从左到右依旧会是:
c,b,a
如果这个时候我们从list结构的同一端中依次将元素弹出:
lpop list
lpop list
lpop list
执行之后的元素信息则会是:
c,b,a
所以可以看出,redis的一种list数据结构类型,在采取不同的指令进行操作之后可以模拟出不同数据结构的效果,整体使用上还是比较灵活的。
List结构的实现方式
在版本3.2之前,Redis 列表list使用两种数据结构作为底层实现:
压缩链表
压缩列表(ziplist)是Redis为了节省内存而开发的,是由一系列特殊编码的连续内存块组成的顺序型数据结构,一个压缩列表可以包含任意多个节点(entry),每个节点可以保存一个字节数组或者一个整数值。重点是内存连续。
ziplist数据结构内部其实有很多个元素部分:
zlbytes 记录了整个ziplist的内存大小是多少字节
zltail 记录了整个内存块中的尾部内存地址
zlen 记录了整个内存块的长度
ziplist中的每个元素都是由一个entry组成的
entry的内部会包含有每个元素的一些基础信息,entry内部包含有:
prevlengh 记录上一个节点的长度,为了方便反向遍历ziplist
encoding 编码方式
data 实际节点的值
双端链表
prev和next两个指针 , 重点是可以从前往后也可以从后往前 , 这就可以实现lpush rpush这些指令了
因为用的链表 , 所以这也就导致了lindex指令 , 获取某个索引值的元素 , 需要遍历链表才可以获取到 , 时间复杂度是 O(n)
当列表对象可以同时满足下列两个条件时,列表对象采用压缩链表编码:
(1)列表对象保存的所有字符串元素的长度都小于64字节;
(2)列表元素保存的元素数量小于512个;
以上两个条件的上限值可以在配置文件中修改 list-max-ziplist-value选项和 list-max-ziplist-entries选项,否则采用双端链表编码
redis3.2版本以后采用的快速列表
quicklist 是一个双向链表,并且是一个ziplist的双向链表,也就是说quicklist的每个节点都是一个ziplist的entries。结合了两者的优点。
因为在ziplist中,每个zlentry都存储着前一个节点所占的字节数,而这个数值又是变长编码的。假设存在一个压缩列表,其包含e1、e2、e3、e4…..,e1节点的大小为253字节,那么e2.prevrawlen的大小为1字节,如果此时在e2与e1之间插入了一个新节点e_new,e_new编码后的整体长度(包含e1的长度)为254字节,此时e2.prevrawlen就需要扩充为5字节;如果e2的整体长度变化又引起了e3.prevrawlen的存储长度变化,那么e3也需要扩…….如此递归直到表尾节点或者某一个节点的prevrawlen本身长度可以容纳前一个节点的变化。其中每一次扩充都需要进行空间再分配操作。删除节点亦是如此,只要引起了操作节点之后的节点的prevrawlen的变化,都可能引起连锁更新。
连锁更新在最坏情况下需要进行N次空间再分配,而每次空间再分配的最坏时间复杂度为O(N),因此连锁更新的总体时间复杂度是O(N^2)。 即使涉及连锁更新的时间复杂度这么高,但它能引起的性能问题的概率是极低的:需要列表中存在大量的节点长度接近254的zlentry。
由于ziplist连锁更新的问题,也使得ziplist的优缺点极其明显;也使得后续Redis采取折中,替换了ziplist。
ziplist的主要优点是节省内存,但它上面的查找操作只能按顺序查找(可以是从前往后、也可以从后往前)。 ziplist将数据按照一定规则编码在一块连续的内存区域,目的是节省内存,这种结构并不擅长做修改操作。一旦数据发生改动,就会引发内存realloc,可能导致内存拷贝。
底层原理小结
Redis中的列表list,在版本3.2之前,列表底层的编码是ziplist和linkedlist实现的,但是在版本3.2之后,重新引入 quicklist,列表的底层都由quicklist实现。
在版本3.2之前,当列表对象中元素的长度比较小或者数量比较少的时候,采用ziplist来存储,当列表对象中元素的长度比较大或者数量比较多的时候,则会转而使用双向列表linkedlist来存储。
这两种存储方式的优缺点
可以认为quickList,是ziplist和linkedlist二者的结合;quickList将二者的优点结合起来。 官方给出的定义
A generic doubly linked quicklist implementation
A doubly linked list of ziplists
quickList是一个ziplist组成的双向链表。每个节点使用ziplist来保存数据。 本质上来说,quicklist里面保存着一个一个小的ziplist。
实战:如何根据List结构来实现一款阻塞队列
为啥要用Redis作为一款阻塞队列呢?
其实这一个技术设计主要取决于当前技术团队的一个发展阶段。业界内其实已经有了很多优秀的开源的消息队列中间件,但是如果我们换一个角度来思考问题,搭建一套成熟稳定的消息中间件所需要花费的时间成本,人力成本其实还是蛮高的。尤其是在创业小公司阶段,面对需要快速上线的业务需求,你作为一名leader还会选择使用这种如此之重的技术方案嘛?(我大概率不会)
打个比方:
我只想给家里小孩买个能打电话的手机带去学校,主要目的是为了让孩子在有需要沟通的时候能够联系到父母,那么此时在你面前有两类选择:老人机(可以打电话),新出的iphone 13(功能特别丰富)。此时你会选择哪个?
brpoplpush 指令
假设说当我们需要将某个元素弹出一个list之后去做一些额外的业务场景处理,此时就会需要考虑到如果业务场景处理过程中出现了异常,那么弹出的元素就会出现丢失情况。
此时比较好的处理方式便是将这个元素在弹出之后插入到一个备份的队列当中,等到真正将元素处理完毕后,再从备份队列中移除。
好了,接下来我们来试试如何通过brpopLpush设计一款适合小型公司使用的消息队列组件。
这里我只给出一些简单设计代码思路, 如果想要把这个组件进行更完善的处理还需要针对内部的具体策略做一些完善和调整。
首先我们来看具体的实现结果:
往redis中发送消息:
然后是消费这条消息:
整个代码的使用方式看起来比较简单易懂。那么该如何去做具体的实现呢?整体的代码模块如下:
由于时间原因,这个组件还有关于重试策略,接入spi模块没有开发完成,但是整体的核心逻辑已经成型。
首先需要定义一个消息队列服务:
public interface IMessageQueueService {
SendResult send(MessageDTO messageDTO);
SendResult sendAsync(MessageDTO messageDTO);
}
此外还需要定义消息发送的DTO
public class MessageDTO implements Serializable {
private static final long serialVersionUID = 2933961555708653068L;
private String jsonParam;
public String getJsonParam() {
return jsonParam;
}
public void setJsonParam(String jsonParam) {
this.jsonParam = jsonParam;
}
@Override
public String toString() {
return "MessageDTO{" +
"jsonParam='" + jsonParam + '\'' +
'}';
}
}
除了这些之外,还有消息发送的结果响应对象:
public class SendResult {
private int code;
private String desc;
public static SendResult sendSuccess(){
return new SendResult(SEND_SUCCESS,"发送成功");
}
public static SendResult sendAsyncSuccess(){
return new SendResult(SEND_ASYNC_SUCCESS,"异步发送成功");
}
public static SendResult sendFail(){
return new SendResult(SEND_FAIL,"发送失败");
}
public SendResult(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
发送数据之后的响应状态值:
public class SendResultStatus {
public static int SEND_ASYNC_SUCCESS =1;
public static int SEND_SUCCESS = 0;
public static int SEND_FAIL = -1;
}
具体的消息发送,我这里是通过引入了redis来实现消息的存储:
public class RedisMessageQueueService implements IMessageQueueService {
private IRedisService redisService;
private String queueName;
private String backQueueName;
public RedisMessageQueueService(String queueName, String backQueueName,IRedisService redisService) {
this.queueName = queueName;
this.backQueueName = backQueueName;
this.redisService = redisService;
}
@Override
public SendResult send(MessageDTO messageDTO) {
if (messageDTO == null) {
return SendResult.sendFail();
}
String jsonParam = JSON.toJSONString(messageDTO);
Long result = redisService.lpush(queueName, jsonParam);
if (result != null) {
return SendResult.sendSuccess();
}
return SendResult.sendFail();
}
@Override
public SendResult sendAsync(MessageDTO messageDTO) {
if (messageDTO == null) {
return SendResult.sendFail();
}
String jsonParam = JSON.toJSONString(messageDTO);
Long result = redisService.lpush(queueName, jsonParam);
if (result != null) {
return SendResult.sendAsyncSuccess();
}
return SendResult.sendFail();
}
}
这里你可能会疑惑,为什么这个对象没有加入Spring容器进行管理(其实是因为没有完善好,所以内部的很多属性都是在Spring容器初始化阶段通过反射来进行注入的...)
相关的属性注入配置类如下:
@Configuration
public class RedisQueueConfiguration implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisQueueConfiguration.class);
@Resource
private ApplicationContext applicationContext;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class clazz = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
RedisQueueSubscription redisQueueSubscription = (RedisQueueSubscription) clazz.getAnnotation(RedisQueueSubscription.class);
if(redisQueueSubscription!=null && RedisMessageSubscription.class.equals(clazz.getSuperclass())) {
String queueName = redisQueueSubscription.queueName();
String backUpQueueName = redisQueueSubscription.backUpQueueName();
int timeOut = redisQueueSubscription.timeOut();
try {
Field queueNameField = clazz.getSuperclass().getDeclaredField("queueName");
queueNameField.setAccessible(true);
queueNameField.set(bean,queueName);
Field backUpQueueNameField = clazz.getSuperclass().getDeclaredField("backUpQueueName");
backUpQueueNameField.setAccessible(true);
backUpQueueNameField.set(bean,backUpQueueName);
Field timeOutField = clazz.getSuperclass().getDeclaredField("timeOut");
timeOutField.setAccessible(true);
timeOutField.set(bean,timeOut);
Field redisServiceField = clazz.getSuperclass().getDeclaredField("redisService");
redisServiceField.setAccessible(true);
redisServiceField.set(bean, applicationContext.getBean(IRedisService.class));
Method methods[] = clazz.getSuperclass().getMethods();
for (Method method : methods) {
if("onReceive".equals(method.getName())){
method.invoke(bean);
}
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
if (field.isAnnotationPresent(RedisQueue.class)) {
RedisQueue redisQueue = field.getAnnotation(RedisQueue.class);
String queueName = redisQueue.queueName();
String backUpQueueName = redisQueue.backQueueName();
RedisMessageQueueService redisMessageQueueService = new RedisMessageQueueService(queueName,backUpQueueName, applicationContext.getBean(IRedisService.class));
try {
LOGGER.info("init redis queue ====== ");
field.set(bean,redisMessageQueueService);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}
还有一个自定义的注解,用来制定消息需要存储的队列名称以及备份队列的名称(主要是考虑到消费失败做重复发送操作)
@Target(value = ElementType.FIELD)
@Documented
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RedisQueue {
String queueName() default "";
String backQueueName() default "";
}
发送部分大概就这样了。
那么消费端则是采用了一次抽象,将核心的处理逻辑抽象到了父类对象中,具体的实现留给了子类去做拓展。
public abstract class RedisMessageSubscription implements IMessageSubscription {
private IRedisService redisService;
private String queueName;
private String backUpQueueName;
private int timeOut;
public RedisMessageSubscription(){
}
/**
* 留给子类执行核心程序
*
* @param result
* @return
*/
public abstract MessageConsumeResult dataHandle(String result);
@Override
public void onReceive() {
Thread thread = new Thread(new ReceiveDataThread());
thread.start();
}
class ReceiveDataThread implements Runnable{
@Override
public void run() {
while (true) {
try {
String result = redisService.brPopLpush(queueName,backUpQueueName,timeOut);
MessageConsumeResult messageConsumeResult = dataHandle(result);
if(messageConsumeResult.isSuccess()){
continue;
} else {
//进入一个重试阶段
}
}catch (Exception e){
//有时候会出现链接超时,所以如果一旦订阅了某个队列之后出现异常,那么就需要考虑到这种中断的情况
e.printStackTrace();
}
}
}
}
public IRedisService getRedisService() {
return redisService;
}
public void setRedisService(IRedisService redisService) {
this.redisService = redisService;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBackUpQueueName() {
return backUpQueueName;
}
public void setBackUpQueueName(String backUpQueueName) {
this.backUpQueueName = backUpQueueName;
}
public int getTimeOut() {
return timeOut;
}
public void setTimeOut(int timeOut) {
this.timeOut = timeOut;
}
}
消费响应类如下:
public class MessageConsumeResult {
private int code;
private String desc;
public MessageConsumeResult(int code, String desc) {
this.code = code;
this.desc = desc;
}
public static MessageConsumeResult consumeSuccess() {
return new MessageConsumeResult(CONSUME_SUCCESS, "消费成功");
}
public boolean isSuccess() {
return CONSUME_SUCCESS == this.getCode();
}
public static MessageConsumeResult consumeFail() {
return new MessageConsumeResult(CONSUME_SUCCESS, "消费成功");
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
响应的一些常量配置类为:
public class MessageConsumeResultStatus {
public static int CONSUME_SUCCESS =1;
public static int CONSUME_FAIL = -1;
}
因此,当需要程序需要使用的时候,只需要自定义一个consumer,然后继承这个抽象的父亲类对象即可。如下:
@RedisQueueSubscription(queueName = "message-list",backUpQueueName = "backup-message-list")
@Component
public class RedisMessageConsumer extends RedisMessageSubscription{
@Override
public MessageConsumeResult dataHandle(String result) {
System.out.println("result is "+ result);
return MessageConsumeResult.consumeSuccess();
}
}
这款组件还有很多不完善的部分,最近也在尝试继续完善它,例如可以考虑加入一些重试机制,关于重试部分打算参考这几篇文章中给出的思路。
https://cloud.tencent.com/developer/article/1038304
http://bittechblog.com/blog/article/8
里面的干货讲解都很赞~~
这是国庆节前的最后一篇文章,Idea在这里先预祝各位读者们国庆节快乐~~
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8