作为后端开发,redis是工作中最绕不开的中间件之一,在工作中通常有以下几个常用用途:
另外丰富的redis数据类型支持了一些扩展功能,如排行榜,消息队列,布隆过滤器,位图等等。而redis的底层实现是十分简单的,核心源码也仅有几万行。本文就带大家来领略,小小的redis是如何实现这些复杂功能的。
*注:本文介绍的源码为redis 5.0.14版本 *
C语言存储字符串的问题:
C语言中表示字符串结尾的符号是'\0',如果字符串本身就具有'\0'字符,就会被截断,即非二进制安全。
计算字符串的长度性能低
C语言中有一个计算字符串长度的函数strlen,但这个函数与Java的不一样,需要遍历整个字符串来计算长度,时间复杂度是O(n),如果需要在循环中计算,性能将十分低下。
字符串拼接性能低
因为C语言字符串不记录长度,对于一个长度n的字符串来说,底层是n+1的字符数组。
char a[n+1]
如果需要增长字符串,则需要对底层的字符数组进行重分配的操作;接下来由数据结构入手,看看redis是如何解决这几个问题的。
struct sds{
int len; //buf中已占字符数
int free; //buf中空闲字符数
char buf[];
}
除了保存字符串的指针buf,还需要记录使用空间和空闲的空间。redis老版本也是这样设计的,这样的设计解决了开头的三个问题:
小字符串空间浪费的问题:
在64位系统中,字符串头部的len和free各占四个字节,对于大字符串而言,这个数字还好,但是如果是小字符串呢,比如buf本身只有一个字节,而头部就占了八个字节,肯定不合适。
redis新版本就给了一种方案,根据buf字符串的长度不同,使用不同的结构体存储,同时新增一个单字节变量flags,保存不同的类型。
但是对于那种只有一个字节长的字符串,如何优化呢?对于那种小字符串,redis中使用一个字节的标志位flags表示 低三位存储类型(type),高五位存储长度(len),而高五位 2^5-1=31 可以存储最多31个字节的字符串。
而大于31个字节的其他几种类型字符串,一个字节存不下,就使用两个变量保存已使用空间和总长度(保留flags字段标识类型,新增len字段标记长度)。
sdshdr8,sdshdr16,sdshdr32,sdshdr64 结构都是一样的,区别在于存储的变量大小。
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; //已使用
uint8_t alloc; // 总长度
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
只介绍扩容操作,其它操作都比较简单,可自行阅读。
扩容源码如下:
sds sdsMakeRoomFor(sds s, size_t addlen) {
struct sdshdr *sh, *newsh; //定义两个 sdshdr 结构体指针
size_t free = sdsavail(s); // 获取 s 目前空闲空间长度
size_t len, newlen; // 前者存储扩展前 sds 字符串长度,后者存储扩展后 sds 字符串长度
if (free >= addlen) return s; // 如果空余空间足够,直接返回
len = sdslen(s); // 获取 s 目前已占用空间的长度
sh = (void*) (s-(sizeof(struct sdshdr))); //结构体指针赋值
newlen = (len+addlen); // 字符串数组 s 最少需要的长度
// 根据新长度,为 s 分配新空间所需的大小
if (newlen < SDS_MAX_PREALLOC) // 如果新长度小于 SDS_MAX_PREALLOC(默认1M),那么为它分配两倍于所需长度的空间
newlen *= 2;
else
newlen += SDS_MAX_PREALLOC; // 否则,分配长度等于目前长度加上 SDS_MAX_PREALLOC(默认1M)
newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1);
if (newsh == NULL) return NULL;
newsh->free = newlen - len;
return newsh->buf;
}
跳跃表类似一个多层的链表,首先从最高层开始查找,如果下一个节点的值大于要查找的值或者下一个节点为null,则往下一层查找。通过空间换时间的策略,将时间复杂度控制在O(logn)。
例如查找51这个数:
首先看一下zskiplistNode的数据结构,zskiplistNode表示跳跃表中的一个节点。
typedef struct zskiplistNode {
sds ele;// 数据
double score; //权重比
struct zskiplistNode *backward; //后退指针,指向当前节点底层 前一个节点
struct zskiplistLevel {
struct zskiplistNode *forward; // 指向当前层的前一个节点
unsigned long span; //forward 指向前一个节点的与当前节点的间距
} level[];
} zskiplistNode;
zskiplist 表示跳跃表:
typedef struct zskiplist {
struct zskiplistNode *header, *tail; //分别指向头结点和尾结点
unsigned long length; //跳跃表总长度
int level; //跳跃表总高度
} zskiplist;
其中,头节点是跳跃表的一个特殊节点,它的level数组元素个数为64。头节点在有序集合中不存储任何member和score值,ele值为NULL, score值为0;也不计入跳跃表的总长度。头节点在初始化时,64个元素的forward都指向NULL, span值都为0。
3.3.1 创建跳跃表
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
// 头结点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
简单来说就是创建了头结点,创建了64个level数组。
3.3.2 随机层高
创建和插入节点的之前,当前节点需要在哪几层出现,是通过计算当前节点的level值, 而level值是redis通过伪随机得出的,层数越高,节点出现的概率越小。
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
层高的数学期望:
高度为1的概率 (1-p)
高度为2的概率 p(1-p)
高度为3的概率 (p2)*(1-p)
….
高度为n的概率(p(n-1))*(1-p)
期望层高 E=1*(1-p)+2p(1-p)+3p2(1-p)... =1/(1-p)
当 p=0.25(redis默认值) 时,跳跃表节点的期望层高为 1/(1-0.25)≈1.33。即多浪费了30%的空间,redis的跳表使用了较低的空间成本,实现了时间复杂度的大减少
3.3.3 插入节点
插入节点总的来说一共四步
源码如下:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
// 查找节点
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
//调整高度
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
//插入节点
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
3.3.4.删除节点
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) { // update[i].level[i] 的 forward 节点是 x 的情况,需要更新 span 和 forward
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {// update[i].level[i] 的 forward 节点不是 x 的情况,只需要更新 span
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) { // 如果 x 不是尾节点,更新 backward 节点
x->level[0].forward->backward = x->backward;
} else { // 否则 更新尾节点
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--; //更新跳跃表 level
zsl->length--; // 更新跳跃表长度
}
3.4.5 跳跃表的应用
zset集合插入第一个元素时,会判断下面两种条件:
满足任一条件 Redis 就会采用跳跃表作为底层实现,否则采用压缩列表作为底层实现方式。
3.4.6 题外话
Q:为什么redis使用跳跃表而不是红黑树呢
引用一下原作者的话:
There are a few reasons: They are not very memory intensive. It's up to you basically. Changing parameters about the probability of a node to have a given number of levels will make then less memory intensive than btrees. A sorted set is often target of many ZRANGE or ZREVRANGE operations, that is, traversing the skip list as a linked list. With this operation the cache locality of skip lists is at least as good as with other kind of balanced trees. They are simpler to implement, debug, and so forth. For instance thanks to the skip list simplicity I received a patch (already in Redis master) with augmented skip lists implementing ZRANK in O(log(N)). It required little changes to the code.About the Append Only durability & speed, I don't think it is a good idea to optimize Redis at cost of more code and more complexity for a use case that IMHO should be rare for the Redis target (fsync() at every command). Almost no one is using this feature even with ACID SQL databases, as the performance hint is big anyway.About threads: our experience shows that Redis is mostly I/O bound. I'm using threads to serve things from Virtual Memory. The long term solution to exploit all the cores, assuming your link is so fast that you can saturate a single core, is running multiple instances of Redis (no locks, almost fully scalable linearly with number of cores), and using the "Redis Cluster" solution that I plan to develop in the future.
简单翻译一下:
整数集合(intset)是一个有序的、存储整型数据的结构。
conding决定了的element的长度,对应关系如下:
typedef struct intset {
//编码
uint32_t encoding;
//元素个数
uint32_t length;
// 柔性数组,根据encoding 决定几个字节表示一个数组
int8_t contents[];
} intset;
4.2.1 查询元素
uint8_t intsetFind(intset *is, int64_t value) {
uint8_t valenc = _intsetValueEncoding(value); //判断编码方式
//编码方式如果大于当前intset的编码方式,直接返回0。否则调用intsetSearch函数进行查找
return valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,NULL);
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
/*如果intset中没有元素,直接返回0 */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* 如果元素大于最大值或者小于最小值,直接返回0 */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}
while(max >= min) { //二分查找该元素
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}
if (value == cur) { //查找到返回1,未查找到返回0
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}
}
4.2.2 插入元素
插入元素比较简单,不再赘述源代码,感兴趣的同学可以查看方法。
4.2.3 删除元素
删除元素比较简单,不再赘述源代码,感兴趣的同学可以查看方法。
intset *intsetAdd(intset *is, int64_t value, uint8_t *success);
当Redis集合类型的元素都是整数并且都处在64位有符号整数范围之内时,使用该结构体存储。
在两种情况下,底层编码会发生转换:
字典底层类似Java的HashMap,但是扩容的方式有一定的区别。
哈希表:
typedef struct dictht {
// 二维数组
dictEntry **table;
// table总大小
unsigned long size;
// 掩码=size-1
unsigned long sizemask;
// 已经保存的键值对
unsigned long used;
} dictht;
二维数组中的键值对:
typedef struct dictEntry {
//键
void *key;
//值
union {
void *val; //值
uint64_t u64;
int64_t s64; //过期时间
double d;
} v;
// hash冲突的next指针
struct dictEntry *next;
} dictEntry;
字典,使用Hash表包了一层:
typedef struct dict {
//操作类型
dictType *type;
// 依赖的数据
void *privdata;
// Hash表
dictht ht[2];
// -1代表没有进行rehash值,否则代表hash操作进行到了哪个索引
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 当前运行的迭代器数
unsigned long iterators; /* number of iterators currently running */
} dict;
5.2.1 添加元素
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) /* 入参字典、键、Hash 表节点地址 */
{
long index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d); /* 该字典是否在进行 rehash 操作,是则执行一次 rehash */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) /* 查找键,找到则直接返回 -1,并把老节点存入 existing 字段,否则把新节点的索引值返回。如果遇到 Hash 表容量不足,则进行扩容 */
return NULL;
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; /* 是否在进行 rehash 操作中,是则插入至散列表 ht[1] 中,否则插入散列表 ht[0] */
entry = zmalloc(sizeof(*entry)); /* 申请新节点内存 */
entry->next = ht->table[index]; /* 将该节点的 next 指针指向 ht->table[index] 指针指向的位置 */
ht->table[index] = entry; /* 将 ht->table[index] 指针指向该节点 */
ht->used++;
dictSetKey(d, entry, key); /* 给新节点存入键信息 */
return entry;
}
其中查找元素的代码:
dictHashKey(d,key), existing)// 根据字典的hash函数得到key的hash值
idx = hash & d->ht[table].sizemask; //利用key的hash值与掩码进行与操作(因为与操作的速度比取余快,也就是为什么要存一个掩码)
int dictExpand(dict *d, unsigned long size)
{
if (dictIsRehashing(d) || d->ht[0].used > size) /* 如果此时正在扩容,或者是扩容大小小于 ht[0] 的表大小,则抛错 */
return DICT_ERR;
dictht n; /* 新 hash 表 */
unsigned long realsize = _dictNextPower(size); /* 重新计算扩容后的值,必须为 2 的 N 次方幂 */
/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR; /* 重新计算的值如果和原来的 size 相等,则无效 */
/* 分配新 Hash 表,并初始化所有指针为 NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* 初始化的情况,而不是进行 rehash 操作,就用 ht[0] 来接收值 */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* 准备第二个 Hash 表,以便执行渐进式哈希操作 */
d->ht[1] = n; /* 扩容后的新内存放入 ht[1] 中 */
d->rehashidx = 0; /* 非默认的 -1,表示需进行 rehash */
return DICT_OK;
}
redis中的key可能有成千上万,如果一次性扩容,会对性能造成巨大的影响,所以redis使用渐进式扩容,每次执行插入,删除,查找,修改等操作前,都先判断当前字典的rehash操作是否在进行,如果是在进行中,就对当前节点进行rehash操作,只执行一次。除此之外,当服务器空闲时,也会调用incrementallyRehash函数进行批量操作,每次100个节点,大概一毫秒。将rehash操作进行分而治之。
渐进式rehash源码:
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* 最大访问的空桶的数量,n*10 */
if (!dictIsRehashing(d)) return 0; /* dict 没有正在进行 rehash 时,直接返回 */
while(n-- && d->ht[0].used != 0) { /* n 为最多迁移元素数量 */
dictEntry *de, *nextde;
assert(d->ht[0].size > (unsigned long)d->rehashidx); /* 为防止 rehashidx 越界,当 rehashidx 大于 ht[0] 的数组大小时,不继续执行 */
while(d->ht[0].table[d->rehashidx] == NULL) { /* 当 rehashidx 位置的桶为空时,继续向下遍历,直到桶不为空或者达到最大访问空桶的数量 */
d->rehashidx++;
if (--empty_visits == 0) return 1; //最大访问空桶数量-1,若减完,则退出
}
de = d->ht[0].table[d->rehashidx];
while(de) { // 遍历桶中元素,移动元素至新表
uint64_t h;
nextde = de->next;
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h]; // 头插法
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL; // ht[0] 对应桶置为空
d->rehashidx++;
}
if (d->ht[0].used == 0) { // 检查是否已经 rehash 完成
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
return 1;
}
5.2.3 查找元素
更新和删除操作大同小异,不在赘述。
redis使用字节数据表示压缩列表,尽最大可能节省空间。
其中,coding字段表示content的编码,其长度是动态变化的。如下表:
encoding字段第1个字节的前2位,可以判断content字段存储的是整数或者字节数组。当content存储的是字节数组时,后续字节标识字节数组的实际长度;当content存储的是整数时,可根据第3、第4位判断整数的具体类型。而当encoding字段标识当前元素存储的是0~12的立即数时,数据直接存储在encoding字段的最后4位,此时没有content字段。
举个例子:
因为解码过程比较繁琐,每次解码都需要性能损耗,为此定义了结构体zlentry,用于表示解码后的压缩列表元素:
typedef struct zlentry {
//previous_entry_length 长度
unsigned int prevrawlensize; /* Bytes used to encode the previous entry len*/
// previous_entry_length
unsigned int prevrawlen; /* Previous entry len. */
//encoding 长度
unsigned int lensize; /* Bytes used to encode this entry type/len.
For example strings have a 1, 2 or 5 bytes
header. Integers always use a single byte.*/
// 内容的长度
unsigned int len; /* Bytes used to represent the actual entry.
For strings this is just the string length
while for integers it is 1, 2, 3, 4, 8 or
0 (for 4 bit immediate) depending on the
number range. */
//首部长度
unsigned int headersize; /* prevrawlensize + lensize. */
//编码
unsigned char encoding; /* Set to ZIP_STR_* or ZIP_INT_* depending on
the entry encoding. However for 4 bits
immediate integers this can assume a range
of values and must be range-checked. */
// 当前元素的首地址
unsigned char *p; /* Pointer to the very start of the entry, that
is, this points to prev-entry-len field. */
} zlentry;
解码分为两步,解码previousentrylength和解码coding
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do {
ZIP_DECODE_PREVLENSIZE(ptr, prevlensize);
// prevlensize=1时,则 ptr 的第一个字节标识上一个节点的长度
if ((prevlensize) == 1) {
(prevlen) = (ptr)[0];
} else if ((prevlensize) == 5) {
assert(sizeof((prevlen)) == 4);
// 如果 prevlensize = 5,取后面 4 个字节作为上一节点的长度
memcpy(&(prevlen), ((char*)(ptr)) + 1, 4);
memrev32ifbe(&prevlen);
}
} while(0);
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {
// 获取当前的编码类型
ZIP_ENTRY_ENCODING((ptr), (encoding));
// 如果编码类型为字节数组
if ((encoding) < ZIP_STR_MASK) {
// encoding == 00000000
if ((encoding) == ZIP_STR_06B) {
// 存储元素的长度数值所需要的字节数设置为 1
(lensize) = 1;
// 元素长度为 (ptr)[0] 和 111111 做位运算
(len) = (ptr)[0] & 0x3f;
// encoding == 10000000
} else if ((encoding) == ZIP_STR_14B) {
// 存储元素的长度数值所需要的字节数设置为 2
(lensize) = 2;
// 元素长度为 高八位:(ptr)[0] 和 111111 做位运算 低八位:(ptr)[1]
(len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];
// encoding == 11000000
} else if ((encoding) == ZIP_STR_32B) {
// 存储元素的长度数值所需要的字节数设置为 5
(lensize) = 5;
// 元素长度为后 4 位
(len) = ((ptr)[1] << 24) |
(ptr)[2] << 16) |
(ptr)[3] << 8) |
((ptr)[4]);
} else {
panic("Invalid string encoding 0x%02X", (encoding));
}
} else {
// 数值类型长度存储为 1 字节
(lensize) = 1;
// 元素长度
(len) = zipIntSize(encoding);
}
} while(0);
static unsigned int zipPrevEncodeLength(unsigned char *p, unsigned int len) {
// 仅返回编码 len 所需的字节数量
if (p == NULL) {
return (len < ZIP_BIGLEN) ? 1 : sizeof(len)+1;
// 写入并返回编码 len 所需的字节数量
} else {
// 1 字节
if (len < ZIP_BIGLEN) {
p[0] = len;
return 1;
// 5 字节
} else {
// 添加 5 字节长度标识
p[0] = ZIP_BIGLEN;
// 写入编码
memcpy(p+1,&len,sizeof(len));
// 如果有必要的话,进行大小端转换
memrev32ifbe(p+1);
// 返回编码长度
return 1+sizeof(len);
}
}
}
重新回顾一下 如果前驱节点的长度小于254,那么preventrylen成员需要用1字节长度来保存这
个长度值。如果前驱节点的长度大于等于254,那么preventrylen成员需要用5字节长度来保存这个长度值。举个例子:
比如有这么连续的四个节点,大小都是253字节,当最前面加入一个大于254字节的节点,会导致后面的节点因为因为previousentrylength从一个字节变成五个字节而频繁扩容,每次扩容缩容都需要分配空间和复制数据,对性能损耗巨大。
因为连锁更新发生的概率十分十分低,所以redis并没有采取相关的措施去避免 最后附一张连锁更新的流程图。
redis中Stream流是Redis5.0以后新加入的数据结构,由生产者,消息,消费者,消费组四个部分组成。
生产者负责向消息队列中生产消息,消费者消费某个消息流 对于消费组,有以下三点:
listpack可以理解为一个字符串序列化队列,可以存储字符串或者整型c语言中没有定义listpack的结构体,因为listpack本身可以理解为是一个字符串数组。
其中encode编码字段,决定了后面的content的内容形式,具体如下表所示:
backlen所占用的每个字节的第一个bit用于标识;0代表结束,1代表尚未结束,每个字节只有7 bit有效,用于从后向前遍历,能够快速找到上一个元素的首字符。
该结构查找效率低下,所以只适合在结尾增删,这刚好符合消息队列的操作。
listpack中增删改操作都是用的同一个方法lpInsert,实现了在任意位置插入元素。删除操作转换为用空元素替换操作,代码比较多,主要介绍一下流程:
插入元素的流程也比较简单,就和在数组中插入一个元素类似,不做过多的介绍。
类似数组的遍历,从前向后遍历。
我们经常会用到前缀树来查找一个单词,查找时间复杂度是O(len(单词的长度))。
比如上图这样一棵前缀树,包含了两个单词,App,Apear。
这是一种典型的空间换时间的方式,但是每个节点存储一个字母,是不是有点浪费了呢,Rax的出现就解决了这个问题。
下图为使用Rax结构保存App和Apear结构:
其中使用带中括号表示的是非压缩节点,其它节点为压缩节点。另外,非压缩节点是按照字典序排序的。
typedef struct rax {
// 头结点
raxNode *head;
// 元素数量(key的数量)
uint64_t numele;
// 节点数量
uint64_t numnodes;
} rax;
typedef struct raxNode {
// 当前节点是否包含key
uint32_t iskey:1; /* Does this node contain a key? */
// 当前key对应的value是否为null
uint32_t isnull:1; /* Associated value is NULL (don't store it). */
// 是否压缩
uint32_t iscompr:1; /* Node is compressed. */
// 压缩节点长度或者非压缩节点个数
uint32_t size:29;
unsigned char data[];
}
压缩节点:
非压缩节点:
压缩节点与非压缩节点最大的不同,除了iscomper标志字段不同外,压缩节点只有最后一个字符有子节点,而非压缩节点每个字符都有子节点。
如果Stream底层将消息都存放在listpack中,会存在性能问题。当查询消息的时候,需要遍历listpack,插入消息的时候,需要重新分配一块很大的空间。
rax *raxNew(void) {
rax *rax = rax_malloc(sizeof(*rax));
if (rax == NULL) return NULL;
rax->numele = 0;
rax->numnodes = 1;
rax->head = raxNewNode(0,0);
if (rax->head == NULL) {
rax_free(rax);
return NULL;
} else {
return rax;
}
}
/**
* 根据key获取对应的value
* @param rax 待查找的rax
* @param s 待查找的key
* @param len s的长度
* @return
*/
void *raxFind(rax *rax, unsigned char *s, size_t len) {
raxNode *h;
debugf("### Lookup: %.*s\n", (int)len, s);
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)
return raxNotFound;
return raxGetData(h); //返回对应的value
}
可以看到,主要的代码在raxLowWalk方法中。
/**
*
* @param rax 待查找的rax
* @param s 待查找的key
* @param len s的长度
* @param stopnode 终止的节点 要么匹配完成,要么没找到。。
* @param plink 父节点指向stopnode的指针的地址
* @param splitpos 压缩节点的匹配位置
* @param ts 记录路径
* @return
*/
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) {
// 从根节点开始查找
raxNode *h = rax->head;
raxNode **parentlink = &rax->head;
// 当前匹配字符位置
size_t i = 0; /* Position in the string. */
// 当前匹配节点位置
size_t j = 0; /* Position in the node children (or bytes if compressed).*/
// 当前节点有子节点且s字符串没有遍历完
while(h->size && i < len) {
debugnode("Lookup current node",h);
unsigned char *v = h->data;
if (h->iscompr) {
//压缩节点判断是否完全匹配
for (j = 0; j < h->size && i < len; j++, i++) {
if (v[j] != s[i]) break;
}
// 没有遍历完字符串,退出
if (j != h->size) break;
} else {
/* Even when h->size is large, linear scan provides good
* performances compared to other approaches that are in theory
* more sounding, like performing a binary search. */
// 非压缩节点
for (j = 0; j < h->size; j++) {
if (v[j] == s[i]) break;
}
// 未在非压缩节点找到字符串
if (j == h->size) break;
// 压缩节点可以匹配
i++;
}
// 记录路径
if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
raxNode **children = raxNodeFirstChildPtr(h);
if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
// 移动到第j个子节点
memcpy(&h,children+j,sizeof(h));
parentlink = children+j;
j = 0; /* If the new node is compressed and we do not
iterate again (since i == l) set the split
position to 0 to signal this node represents
the searched key. */
}
debugnode("Lookup stop node is",h);
if (stopnode) *stopnode = h;
if (plink) *plink = parentlink;
if (splitpos && h->iscompr) *splitpos = j;
return i;
}
这里的步骤比较简单:
向rax中添加key-value对有两种方式,覆盖和不覆盖原有key对应的方法分别为:
int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
return raxGenericInsert(rax,s,len,data,old,1);
}
int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
return raxGenericInsert(rax,s,len,data,old,0);
}
两者都是调用同一个方法raxGenericInsert。该接口方法主要分为以下几步:
i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);
// 如果之前节点没有数据,就分配一个空间
if (!h->iskey || (h->isnull && overwrite)) {
h = raxReallocForData(h,data);
if (h) memcpy(parentlink,&h,sizeof(h));
}
if (h == NULL) {
errno = ENOMEM;
return 0;
}
/* Update the existing key if there is already one. */
// 更新数据
if (h->iskey) {
if (old) *old = raxGetData(h);
if (overwrite) raxSetData(h,data);
errno = 0;
return 0; /* Element already exists. */
}
有以下几种插入情况:
和第一种类似,只是右边的节点变成了非压缩节点。
上面列举了五种情况,可以分为两类:
源码中细节较多,不再细讲 感兴趣的同学可以查看。
int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void**old, int overwrite)
删除接口用于删除rax中某个key,依旧拿redis源码中注释的例子 原来有这样一个rax,其有两个key,foo和foobar
也有以下几种情况:
stream流就像是一个消息链表,依赖于Rax结构和listpack结构,本节主要介绍消息流的增删查操作。
typedef struct stream {
// 指向rax树
rax *rax;
// 元素个数
uint64_t length;
// 指向最后一个消息
streamID last_id;
// 消费组
rax *cgroups;
} stream;
结构如下图所示:
redis提供streamAppendItem 函数,向stream中添加一个新的消息。
/**
*
* @param s 待插入的数据流
* @param argv 消息内容
* @param numfields 消息数量
* @param added_id 消息id
* @param use_id 调用方定义的id
* @return
*/
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id)
代码比较多就不贴了,画一张流程图描述细节。
消费组也是保存在rax树中,以消费组的名称为key,消费组的streamCG结构为value。
treamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
//当前消息流没有消费组,就新建一个
if (s->cgroups == NULL) s->cgroups = raxNew();
// 查找是否有重名消费组,有就直接返回
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL;
// 新建消费组
streamCG *cg = zmalloc(sizeof(*cg));
cg->pel = raxNew();
cg->consumers = raxNew();
cg->last_id = *id;
// 将消费组插入到消费组树中
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
redis没有提供为消费组中新增消费者的方法。在查询消费者的时候,如果不存在,就会新增。
删除消息通过将listpack中消息的标志位设为已删除,并不是真正的删除。如果整个listpack的消息都被删除了,才会从rax中释放该节点。
void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
// 当前消息所在的listpack
unsigned char *lp = si->lp;
int64_t aux;
// 标记删除位
int flags = lpGetInteger(si->lp_flags);
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&si->lp_flags,flags);
//修改有效的消息数量
unsigned char *p = lpFirst(lp);
aux = lpGetInteger(p);
// 如果只有待删除的消息,就直接释放listpack
if (aux == 1) {
/* If this is the last element in the listpack, we can remove the whole
* node. */
lpFree(lp);
raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
} else {
// 更新统计信息
lp = lpReplaceInteger(lp,&p,aux-1);
// 查找删除节点p
p = lpNext(lp,p);
aux = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,aux+1);
// 更新listpack指针,可能因为扩容缩容而变化
if (si->lp != lp)
raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
}
主要是利用rax查询接口:
streamCG *streamLookupCG(stream *s, sds groupname) {
if (s->cgroups == NULL) return NULL;
streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
sdslen(groupname));
return (cg == raxNotFound) ? NULL : cg;
}
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
// 在消费者的rax中查找指定的消费者
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) {
if (!create) return NULL;
// 如果没有找到,新建一个消费者,插入到消费者rax树中
consumer = zmalloc(sizeof(*consumer));
consumer->name = sdsdup(name);
consumer->pel = raxNew();
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL);
}
consumer->seen_time = mstime();
return consumer;
}
redis内部使用文件事件处理器file event handler,这个文件处理器是单线程的,所以我们经常说的redis是单线程模型。
客户端 Socket01 向 Redis 的 Server Socket 请求建立连接,此时 Server Socket 会产生一个 AEREADABLE 事件,IO 多路复用程序监听到 server socket 产生的事件后,将该事件压入队列中。文件事件分派器从队列中获取该事件,交给连接应答处理器。连接应答处理器会创建一个能与客户端通信的 Socket01,并将该 Socket01 的 AEREADABLE 事件与命令请求处理器关联。
假设此时客户端发送了一个 set key value 请求,此时 Redis 中的 Socket01 会产生 AEREADABLE 事件,IO 多路复用程序将事件压入队列,此时事件分派器从队列中获取到该事件,由于前面 Socket01 的 AEREADABLE 事件已经与命令请求处理器关联,因此事件分派器将事件交给命令请求处理器来处理。命令请求处理器读取 Scket01 的 set key value 并在自己内存中完成 set key value 的设置。操作完成后,它会将 Scket01 的 AE_WRITABLE 事件与令回复处理器关联。
如果此时客户端准备好接收返回结果了,那么 Redis 中的 Socket01 会产生一个 AEWRITABLE 事件,同样压入队列中,事件分派器找到相关联的命令回复处理器,由命令回复处理器对 socket01 输入本次操作的一个结果,比如 ok,之后解除 Socket01 的 AEWRITABLE 事件与命令回复处理器的关联。
redis4.0开始引入了多线程,除了主线程,redis有后台线程进行一些边缘的缓慢的操作,比如释放无用连接,rehash迁移等操作。
在redis6.0中,真正引入了多线程。
本文介绍了redis基本数据类型以及redis5.0以后的新特性。工作中我们也常常使用redis进行各种逻辑的处理,而了解其源码可以避免踩很多坑。另外,redis底层的设计也有很多值得学习的地方,比如更高效的使用内存和提升运算的时间复杂度,了解这些可以帮助我们在性能优化中有更多的思路。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8