分布式 ID 生成系统 Leaf 的设计思路,源码解读

597次阅读  |  发布于2年以前

小伙伴们好呀, 今天来分享下最近研究的分布式 ID 生成系统 —— Leaf ,一起来思考下这个分布式ID的设计吧

什么是分布式ID?

ID 最大的特点是 唯一

而分布式 ID,就是指分布式系统下的 ID,它是 全局唯一 的。

为啥需要分布式ID呢?

这就和 唯一 息息相关了。

比如我们用 MySQL 存储数据,一开始数据量不大,但是业务经过一段时间的发展,单表数据每日剧增,最终突破 1000w,2000w …… 系统开始变慢了,此时我们已经尝试了 优化索引读写分离升级硬件升级网络 等操作,但是 单表瓶颈 还是来了,我们只能去 分库分表 了。

而问题也随着而来了,分库分表后,如果还用 数据库自增ID 的方式的话,那么在用户表中,就会出现 两个不同的用户有相同的ID 的情况,这个是不能接受的。

分布式ID全局唯一 的特点,正是我们所需要的。

分布式ID的生成方式

基本就上面几种了,UUID 的最大缺点就是太长,36个字符长度,而且无序,不适合。

而其他两种的缺点还有办法补救,可能这也是 Leaf 提供这两种生成 ID 方式的原因。

项目简介

Leaf ,分布式 ID 生成系统,有两种生成 ID 的方式:

  1. 号段模式
  2. Snowflake模式

号段模式

数据库自增ID 的基础上进行优化

  1. 增加一个 segement ,减少访问数据库的次数。
  2. 双 Buffer 优化,提前缓存下一个 Segement,降低网络请求的耗时(降低系统的TP999指标)

来自美团技术团队

biz_tag用来区分业务,max_id表示该biz_tag目前所被分配的ID号段的最大值,step表示每次分配的号段长度

没优化前,每次都从 db 获取,现在获取的频率和 step 字段相关。

双 Buffer 优化思路

号段模式源码解读

SegmentService 构造方法

作用

  1. 配置 dataSource
  2. 设置 MyBatis
  3. 实例化 SegmentIDGenImpl
  4. 执行 init 方法

这段代码我也忘了 哈哈,已经多久没直接用 mybatis 了,还是重新去官网翻看的。

mybatis 官网例子

实例化 SegmentIDGenImpl 时,其中有两个变量要留意下

  1. SEGMENT_DURATION,智能调节 step 的关键
  2. cache ,其中 SegmentBuffer 是双 Buffer 的关键设计。

这里先不展开,看看 init 方法先。

SegmentIDGenImpl init 方法

作用

  1. 执行 updateCacheFromDb 方法
  2. 开后台线程,每分钟执行一次 updateCacheFromDb() 方法

显然,核心在 updateCacheFromDb

updateCacheFromDb 方法

这里就直接看源码和我加的注释

private void updateCacheFromDb() {
        logger.info("update cache from db");
        StopWatch sw = new Slf4JStopWatch();
        try {
            // 执行 SELECT biz_tag FROM leaf_alloc 语句,获取所有的 业务字段。
            List<String> dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
            // 缓存中的 biz_tag
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
            // 要插入的 db 中的 biz_tag
            Set<String> insertTagsSet = new HashSet<>(dbTags);
            // 要移除的缓存中的 biz_tag 
            Set<String> removeTagsSet = new HashSet<>(cacheTags);

            // 缓存中有的话,不用再插入,从 insertTagsSet 中移除
            for (int i = 0; i < cacheTags.size(); i++) {
                String tmp = cacheTags.get(i);
                if (insertTagsSet.contains(tmp)) {
                    insertTagsSet.remove(tmp);
                }
            }

            // 为新增的 biz_tag 创建缓存 SegmentBuffer
            for (String tag : insertTagsSet) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
                logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
            }


            // db中存在的,从要移除的 removeTagsSet 移除。
            for (int i = 0; i < dbTags.size(); i++) {
                String tmp = dbTags.get(i);
                if (removeTagsSet.contains(tmp)) {
                    removeTagsSet.remove(tmp);
                }
            }

            // 从 cache 中移除不存在的 bit_tag。
            for (String tag : removeTagsSet) {
                cache.remove(tag);
                logger.info("Remove tag {} from IdCache", tag);
            }
        } catch (Exception e) {
            logger.warn("update cache from db exception", e);
        } finally {
            sw.stop("updateCacheFromDb");
        }
    }

执行完后,会出现这样的 log

Add tag leaf-segment-test from db to IdCache, SegmentBuffer SegmentBuffer{key='leaf-segment-test', segments=[Segment(value:0,max:0,step:0), Segment(value:0,max:0,step:0)], currentPos=0, nextReady=false, initOk=false, threadRunning=false, step=0, minStep=0, updateTimestamp=0}

最后 init 方法结束后,会将 initOk 设置为 true


项目启动完毕后,我们就可以调用这个 API 了。

如图,访问 LeafController 中的 Segment API,可以获取到一个 id。

SegmentIDGenImpl get 方法

可以看到,init 不成功会报错。

以及会直接从 cache 中查找这个 key(biz_tag) , 没有的话会报错。

拿到这个 SegmentBuffer 时,还得看看它 init 了 没有,没有的话用双检查锁的方式去更新

先来看下一眼 SegmentBuffer 的结构

SegmentBuffer 类

⭐updateSegmentFromDb 方法

这里就是更新缓存的方法了,主要是更新 Segment 的 value , max,step 字段。

可以看到有三个 if 分支,下面展开说

分支一:初始化

第一次,buffer 还没 init,如上图,执行完后会更新 SegmentBuffer 的 step 和 minStep 字段。

分支二:第二次更新

这里主要是更新这个 updateTimestamp ,它的作用看分支三

分支三:剩下的更新

这里就比较有意思了,就是说如果这个号段在 15分钟 内用完了,那么它会扩大这个 step (不超过 10w),创建一个更大的 MaxId ,降低访问 DB 的频率。

那么,到这里,我们完成了 updateSegmentFromDb 方法,更新了 Segment 的 value , max,step 字段。

但是,我们不是每次 get 都走上面的流程,它还得走这个缓存方法

⭐getIdFromSegmentBuffer 方法

显然,这是另一个重点。

如图,在死循环中,先获取读锁,拿到当前的号段 Segment,进行判断

这里要重点留意 读写锁的使用 ,比如 开新线程时,使用了这个 写锁 ,里面的 nextReady 等变量使用了 volatile 修饰

这里的核心就是切换 Segment。

至此,号段模式结束。

优缺点

信息安全如果ID是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定URL即可;如果是订单号就更危险了,竞对可以直接知道我们一天的单量。所以在一些应用场景下,会需要ID无规则、不规则。—— 《Leaf——美团点评分布式ID生成系统》

美团可以看到,这个号段模式的最大弊端就是 信息不安全,所以在使用时得三思,能不能用到这些业务中去。


Snowflake模式

雪花算法,核心就是将 64bit 分段,用来表示时间,机器,序列号等。

41-bit的时间可以表示(1L<<41)/(1000L360024*365)=69年的时间,10-bit机器可以分别表示1024台机器。

12个自增序列号可以表示2^12个ID,理论上snowflake方案的QPS约为 2^12 * 1000 = 409.6w/s

这里使用 Zookeeper 持久顺序节点的特性自动对 snowflake 节点配置 wokerID,不用手动配置。

时钟回拨问题

img

Snowflake模式源码解读

这部分源码就不一一展开了,直接展示核心代码

SnowflakeZookeeperHolder init 方法

这里要注意调整这个 connectionTimeoutMs 和 sessionTimeoutMs ,不然两种模式都启动的话,这个 zk 的 session 可能会超时,造成启动失败。

图中流程

  1. 看看 zk 节点存不存在,不存在就创建
  2. 同时将 worker id 保存到本地。
  3. 创建定时任务,更新 znode。

znode

worker Id

定时任务

SnowflakeIDGenImpl get 方法

这里直接看代码和注释了

@Override
    public synchronized Result get(String key) {
        long timestamp = timeGen();
        //  发生了回拨,此刻时间小于上次发号时间
        if (timestamp < lastTimestamp) {
            long offset = lastTimestamp - timestamp;
            if (offset <= 5) {
                try {
                    //时间偏差大小小于5ms,则等待两倍时间
                    wait(offset << 1);
                    timestamp = timeGen();
                    //还是小于,抛异常并上报
                    if (timestamp < lastTimestamp) {
                        return new Result(-1, Status.EXCEPTION);
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("wait interrupted");
                    return new Result(-2, Status.EXCEPTION);
                }
            } else {
                return new Result(-3, Status.EXCEPTION);
            }
        }
        if (lastTimestamp == timestamp) {
            // sequenceMask = ~(-1L << 12 ) = 4095 二进制即 12 个1
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                //seq 为0的时候表示是下一毫秒时间开始对seq做随机
                sequence = RANDOM.nextInt(100);
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            //如果是新的ms开始
            sequence = RANDOM.nextInt(100);
        }
        lastTimestamp = timestamp;
        // timestampLeftShift = 22, workerIdShift = 12 
        long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
        return new Result(id, Status.SUCCESS);
    }

    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    protected long timeGen() {
        return System.currentTimeMillis();
    }

API 效果

生成 ID

反解 ID

至此,这个 Snowflake 模式也了解完毕了。

总结

看完上面两种模式,我觉得两种模式都有它适用的场景,号段模式更适合对内使用(比如 用户ID),而如果你这个 ID 会被用户看到,暴露出去有其他风险(比如爬虫恶意爬取等),那就得多斟酌了,。而订单号 就更适合用 snowflake 模式。

分布式ID 的特点

  1. 全局唯一
  2. 趋势递增(有序一直很重要,粗略有序还是严格有序就看情况了)
  3. 可反解(可选)
  4. 信息安全(可选)

参考资料

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8