大话Redis系列--数据结构系列之zset类型

643次阅读  |  发布于3年以前

今天这篇文章中,我们主要是来聊聊关于Redis的zset数据结构。

聊这款数据结构之前,我想先讲讲自己为什么要学习这款数据结构的背后知识体系。

说实话,这个数据结构之前我可能只是单纯地知道它该怎么调用,然后在某些场景下其底层的实现原理是采用了ziplist,在某些场景下会使用到skiplist,但是具体的原理和细节,了解的并不深入。

但是最近的好几次项目需求开发中,发现在工作中开始渐渐使用到了这类数据结构,例如在落地推荐系统的时候,会将用户的信息匹配度使用zset结构来进行存储。在某些特殊的业务场景下使用了zset来存储心跳信息等等。

zset的使用介绍

redis 有序集合zset和集合set一样也是string类型元素的集合,且不允许重复的成员。

不同的是 zset 的每个元素都会关联一个分数(分数可以重复),redis 通过分数来为集合中 的成员进行从小到大的排序。

这是一种有序的集合,可以进行排序,但是内部的元素都是唯一的。它和普通的set不一样之处在于,该集合会给每个元素添加一个分数,排序的时候按照这个分数来进行排序。

插入元素

zadd key score member
> zadd test-zset 19 key-19
(integer) 1

获取相关元素的value值

zscore key member
> zscore test-zset key-19
19.0

获取某个元素在对应大小区间段内的排序信息

zrange key start stop [withscores]
> zrange test-zset 0 10 withscores
1) "key-19"
2) 19.0

将某个zset内部的集合按照得分大小进行排序

zrevrange key start end   
> zrange test-zset 0 10
1) "key-19"

移除某个zset内部的一些指定key

zrem key members
> zrem test-zset key-19
1

zset这类数据结构在底层实现的时候我们可以通过使用object encodeing和type指令观察一些基础属性。

来看接下来的这段代码案例:

127.0.0.1:6379> zadd zset-1 10 key1
(integer) 1
127.0.0.1:6379> zadd zset-2 11 key2
(integer) 1
127.0.0.1:6379> zadd zset-3 12 key3
(integer) 1
127.0.0.1:6379> zadd zset 12 key1
(integer) 1
127.0.0.1:6379> zadd zset 13 key2
(integer) 1
127.0.0.1:6379> zadd zset 14 key3
(integer) 1
127.0.0.1:6379> type zset
zset
127.0.0.1:6379> object encoding zset
"ziplist"
127.0.0.1:6379> zadd zset 15 ddddddddddddddddddddddddddddddddddddddddddd
ddddddddddddddddddddddddddddd
(integer) 1
127.0.0.1:6379> object encoding zset
"skiplist"
127.0.0.1:6379>

通过这段案例之后你会发现,当往zset中插入过长的元素之后,对应的zset编码格式会从ziplist转变为skiplist类型。

那么什么是skiplist呢?

SkipList的基本结构

直接通过一张图我们来了解下skipList数据结构长什么样子吧:

从上边的图中我们可以看到,跳表在进行设计的时候额外用了一部分的存储空间来存储索引信息数据。通过索引信息来查询对应的具体数据。

假设程序需要查询图中的40这个key,那么此时只需要通过匹配,查询的时候定位到23,30,34三个数字,然后便可定位到具体的区间段内,

跳表的核心思想是通过了空间换时间的思路来减少无谓的查询。

我们也可以归纳一下:

假设存储了n个元素的情况下,

第一层索引的节点个数:n/2

第二层索引的节点个数:n/2 ^ 2

第三层索引的节点个数:n/2 ^ 3

第k层索引的节点个数:n/2 ^ k

所以如果在使用skiplist的时候,一昧地给它新增索引层数,有可能会导致空间开销越来越大,因此索引的层数必须要有一个数字大小的上限控制。

Redis在底层是如何实现SkipList的?

zadd命令出处 server.c 代码文件中可以查找到:

    {"zadd",zaddCommand,-4,
     "write use-memory fast @sortedset",
     0,NULL,1,1,1,0,0,0},

zaddCommand点击进去深入了解其中的实现:

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_IN_NONE);
}

继续深入定位zaddGenericCommand函数:

/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements, ch = 0;
    int scoreidx = 0;
    /* The following vars are used in order to track what the command actually
     * did during the execution, to reply to the client and to trigger the
     * notification of keyspace change. */
    int added = 0;      /* Number of new elements added. */
    int updated = 0;    /* Number of elements with updated score. */
    int processed = 0;  /* Number of elements processed, may remain zero with
                           options like XX. */

    /* Parse options. At the end 'scoreidx' is set to the argument position
     * of the score of the first score-element pair. */
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
        else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
        else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
        else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
        else break;
        scoreidx++;
    }

    /* Turn options into simple to check vars. */
    int incr = (flags & ZADD_IN_INCR) != 0;
    int nx = (flags & ZADD_IN_NX) != 0;
    int xx = (flags & ZADD_IN_XX) != 0;
    int gt = (flags & ZADD_IN_GT) != 0;
    int lt = (flags & ZADD_IN_LT) != 0;

    /* After the options, we expect to have an even number of args, since
     * we expect any number of score-element pairs. */
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReplyErrorObject(c,shared.syntaxerr);
        return;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    /* Check for incompatible options. */
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }

    if ((gt && nx) || (lt && nx) || (gt && lt)) {
        addReplyError(c,
            "GT, LT, and/or NX options at the same time are not compatible");
        return;
    }
    /* Note that XX is compatible with either GT or LT */

    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. */
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    zobj = lookupKeyWrite(c->db,key);
    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
        // 这里可以看到当某个元素的value值大于一定阈值的时候,就会采用skiplist数据结构
            zobj = createZsetZiplistObject();
        }
        //将该数据结构添加到字典里面去
        dbAdd(c->db,key,zobj);
    }

    for (j = 0; j < elements; j++) {
        double newscore;
        score = scores[j];
        int retflags = 0;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        //追加元素
        int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_OUT_ADDED) added++;
        if (retflags & ZADD_OUT_UPDATED) updated++;
        if (!(retflags & ZADD_OUT_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReplyNull(c);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

通过上述的代码阅读,主要关注的是一个叫做zsetAdd函数,

int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore)

这个函数的底层内部涉及到了插入数据的判断,内部有一个核心模块叫做zzlInset

int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
    /* Turn options into simple to check vars. */
    int incr = (in_flags & ZADD_IN_INCR) != 0;
    int nx = (in_flags & ZADD_IN_NX) != 0;
    int xx = (in_flags & ZADD_IN_XX) != 0;
    int gt = (in_flags & ZADD_IN_GT) != 0;
    int lt = (in_flags & ZADD_IN_LT) != 0;
    *out_flags = 0; /* We'll return our response flags. */
    double curscore;

    /* NaN as input is an error regardless of all the other parameters. */
    if (isnan(score)) {
        *out_flags = ZADD_OUT_NAN;
        return 0;
    }

    /* Update the sorted set according to its encoding. */
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;

        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            /* Prepare the score for the increment if needed. */
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            }

            /* GT/LT? Only update if score is greater/less than current. */
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            if (newscore) *newscore = score;

            /* Remove and re-insert when score changed. */
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        } else if (!xx) {
            /* Optimize: check if the element is too large or the list
             * becomes too long *before* executing zzlInsert. */
            zobj->ptr = zzlInsert(zobj->ptr,ele,score);
            if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
                sdslen(ele) > server.zset_max_ziplist_value)
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            if (newscore) *newscore = score;
            *out_flags |= ZADD_OUT_ADDED;
            return 1;
        } else {
            *out_flags |= ZADD_OUT_NOP;
            return 1;
        }
    }
    //跳表类型数据结构的插入
    else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        //查找是否存在该类数据结构
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }
            //查询对应元素的得分
            curscore = *(double*)dictGetVal(de);

            /* Prepare the score for the increment if needed. */
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            }

            /* GT/LT? Only update if score is greater/less than current. */
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            if (newscore) *newscore = score;

            /* Remove and re-insert when score changes. */
            if (score != curscore) {
                //如果元素相同,但是得分不同,则需要更新该元素的得分 先删除再插入
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);
                /* Note that we did not removed the original element from
                 * the hash table representing the sorted set, so we just
                 * update the score. */
                dictGetVal(de) = &znode->score; /* Update score ptr. */
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        } else if (!xx) {
            ele = sdsdup(ele);
            znode = zslInsert(zs->zsl,score,ele);
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *out_flags |= ZADD_OUT_ADDED;
            if (newscore) *newscore = score;
            return 1;
        } else {
            *out_flags |= ZADD_OUT_NOP;
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

从源代码可以看到zset的底层编码会随着元素的数目和member的长度大小发生变化:

这段函数的内部具体逻辑也是比较复杂的,内部有一个if判断,其中当当前采用了SkipList的编码格式,则会判断所需要插入的元素是否已存在,如果是已经存在的元素,则只需要更新对应的score得分数值即可,这部分更新操作被作者封装在了这么一个函数当中:

zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    /* We need to seek to element to update to start: this is useful anyway,
     * we'll have to update or remove it. */
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < curscore ||
                    (x->level[i].forward->score == curscore &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }

    /* Jump to our element: note that this function assumes that the
     * element with the matching score exists. */
    x = x->level[0].forward;
    serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);

    /* If the node, after the score update, would be still exactly
     * at the same position, we can just update the score without
     * actually removing and re-inserting the element in the skiplist. */
    if ((x->backward == NULL || x->backward->score < newscore) &&
        (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
    {
        x->score = newscore;
        return x;
    }

    //关键点 先删除掉,再插入一个新的元素
    /* No way to reuse the old node: we need to remove and insert a new
     * one at a different place. */
    zslDeleteNode(zsl, x, update);
    zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
    /* We reused the old node x->ele SDS string, free the node now
     * since zslInsert created a new one. */
    x->ele = NULL;
    zslFreeNode(x);
    return newnode;
}

通过阅读了这段代码之后,我才明白,当频繁对一个zset进行相通key的score进行更新的时候,其底层是会对该元素进行先删除再更新的操作。

哦,那么是否说这样的删除插入操作会对整个结构的性能造成影响呢?(因为工作中确实有在项目里面这么使用过,客户端频繁发送请求去更新zset内部的一个score数值)后来查阅了相关资料发现:

在 Redis 5.0 版本中,Redis 的作者 Antirez 优化了这个更新的过程,目前的更新过程是如果判断这个 value是否存在,如果存在的话就直接更新,然后再调整整个跳跃表的 score 排序,这样就不需要两次的搜索过程。

如果存在相同scope的元素时候,该如何排序?

在比较时,不仅比较分数(相当于skiplist的key),还比较数据本身。在Redis的skiplist实现中,数据本身的内容唯一标识这份数据,而不是由key来唯一标识。另外,当多个元素分数相同的时候,还需要根据数据内容来进字典排序。

接下来我们继续深入学习。

在skiplist数据结构的底层当中,有一个叫做跳表节点的概念,核心源代码部分为以下模块:

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span; //每个索引层之间的索引节点之间的数据间隔
    } level[]; //索引层
} zskiplistNode;

score 是对应的得分,

ele 是对应的key,

level[] 是对应的索引层集合,

forward 这个指针用于指向下一个元素,

backward 这个指针用于指向上一个元素,

span 是两个不同索引节点之间的间隔数目

可以看出在底层的链表当中,元素之间的链接是通过指针的方式进行引用,从这里可以看出zset底层存储的数据是存储在了一个链式结构中,而并非是数组结构中,这种结构的好处在于频繁插入或者更新的时候只需要做简单的指针调整即可,相比于数组而言开销要低很多。

深入看看zskiplist这个结构体当中,结构体内部有一个头结点指针,还有一个尾节点指针,设计两个不同类型的指针有个好处就是在对队列元素按照得分排序查询的时候,不论是倒序还是顺序排序都能够很快就计算出来。

typedef struct zskiplist {
    //zrange和zrerange的遍历顺序关键是根据这两个指针去处理的
    struct zskiplistNode *header, *tail;
    unsigned long length; // 索引的个数
    int level //层高度
} zskiplist;

关键信息在源代码当中的一些XXXRange函数中可以看到相关的影子。

最后我们再看看zSet集合的结构体设计:

typedef struct zset {
    dict *dict; //为了快速索引到对应记录
    zskiplist *zsl; //存储有序的数据
} zse

从这个结构体中可以看出zset结构中有对应的zskiplist类型。现在我们再去使用zset结构之后可能就会更加了解内部的机制了。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8