Redisson分布式锁学习总结:读锁 RedissonReadLock#unLock 释放锁源码分析

861次阅读  |  发布于2年以前

一、RedissonReadLock#unlock 源码分析

上一篇已经简单介绍了,redisson 提供的读写锁 RReadWriteLock 的使用demo、使用场景、和RedissonLock 的关系;也深入分析了读锁 RedissonReadLock 加锁 lua 脚本的执行逻辑、watchdog 机制 lua 脚本的执行逻辑。

下面,我们将继续分析读锁 RedissonReadLock 释放锁时,lua 脚本是怎么执行的。

1、RedissonReadLock 释放锁 lua 脚本分析

分析前,我们定一下加锁的key:

RReadWriteLock readWriteLock = client.getReadWriteLock("myLock");

RedissonReadLock#unlockInnerAsync:

@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
            "if (mode == false) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
            "if (lockExists == 0) then " +
                "return nil;" +
            "end; " +

            "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
            "if (counter == 0) then " +
                "redis.call('hdel', KEYS[1], ARGV[2]); " + 
            "end;" +
            "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

            "if (redis.call('hlen', KEYS[1]) > 1) then " +
                "local maxRemainTime = -3; " + 
                "local keys = redis.call('hkeys', KEYS[1]); " + 
                "for n, key in ipairs(keys) do " + 
                    "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                    "if type(counter) == 'number' then " + 
                        "for i=counter, 1, -1 do " + 
                            "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                            "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                        "end; " + 
                    "end; " + 
                "end; " +

                "if maxRemainTime > 0 then " +
                    "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                    "return 0; " +
                "end;" + 

                "if mode == 'write' then " + 
                    "return 0;" + 
                "end; " +
            "end; " +

            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; ",
            Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
            LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
}

我们可以看到,这读锁释放锁的lua脚本还是比较长的,但是我们也不用着急,一步一步分析就可以了。

1.1、KEYS

Arrays.asList(getName(), getChannelName(), timeoutPrefix, keyPrefix):

  • getName(): 锁key
  • getChannelName():prefixName("redisson_rwlock", getName()) -> redisson_rwlock:{myLock}
  • timeoutPrefix:getReadWriteTimeoutNamePrefix(threadId) -> suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout" -> {myLock}:UUID-1:threadId-1:rwlock_timeout
  • keyPrefix:getKeyPrefix(threadId, timeoutPrefix) -> timeoutPrefix.split(":" + getLockName(threadId))[0] -> {myLock}

KEYS:["myLock","redisson_rwlock:{myLock}","{myLock}:UUID-1:threadId-1:rwlock_timeout","{myLock}"]

1.2、ARGVS

LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)

  • LockPubSub.UNLOCK_MESSAGE:0L
  • getLockName(threadId):return id + ":" + threadId,客户端ID(UUID):线程ID(threadId)

ARGVS:[0L,"UUID:threadId"]

1.3、lua 脚本分析

1、分支一:锁模式不存在,往锁对应的channel发送消息

场景:

  • 如果锁模式不存在,那么证明没有线程持有读写锁
  • 当前线程即使没有持有锁,但还是调用了释放锁的方法

lua脚本:

"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
    "redis.call('publish', KEYS[2], ARGV[1]); " +
    "return 1; " +
"end; "

分析:

1 . 利用 hget 命令获取读写锁的模式

 hget myLock mode

2 . 如果锁模式为空,往读写锁对应的channel发送释放锁的消息,然后返回1,lua脚本执行完毕

 publish redisson_rwlock:{myLock} 0

channel 发布锁释放消息的用处:

  • 其实在 Redisson 提供的各种分布式锁中,不管是可重入锁、公平锁,还是到现在的读写锁,都会利用 redis 的pub/sub机制来做下面的通知机制。
  • 在线程获取锁失败的时候,在等待前会先订阅锁对应的 channel,然后进入等待状态。
  • 如果当前线程成功释放锁,那么会在锁对应的 channel 发布释放锁的消息;假设此时有其他线程在等待获取锁,那么就会接收到 channel 里释放锁的消息,提前跳出等待状态,去获取锁。
  • 关于锁channel,要注意的是:可重入锁和读写锁,等待线程都是订阅同一个channel;而公平锁不是,公平锁是每个等待线程都订阅自己指定的channel,从而做到公平。

2、分支二:锁存在,但当前线程没有持有锁

场景:

  • 锁存在,但当前线程没有持有锁

lua脚本:

"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
    "return nil;" +
"end; "

分析:

1 . 利用 hexists 命令判断当前线程是否持有锁

hexists myLock UUID-1:threadId-1

2 . 如果不存在直接返回null,表示释放锁失败

3、分支三:锁存在且当前线程持有锁

场景:

  • 锁存在且当前线程持有锁
  • 如果持有锁次数只有1,那么会删除锁集合中的加锁次数记录
  • 最后,删除持有锁对应的加锁超时记录

lua脚本:

"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
"if (counter == 0) then " +
    "redis.call('hdel', KEYS[1], ARGV[2]); " + 
"end;" +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); "

分析: 分析前,我们假设线程 UUID-1:threadId-1 持有两个读锁,那么 redis 中锁相关数据如下:

myLock:{
    "mode":"read",
    "UUID-1:threadId-1":2
}
{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
{myLock}:UUID-1:threadId-1:rwlock_timeout:2 1

1 . 利用 hincrby 命令,给当前线程持有锁数量减1

 hincrby myLock UUID-1:threadId-1 -1

2 . 如果持有锁数量减1后等于0,证明当前线程不再持有锁,那么利用 hdel 命令将锁map中加锁次数记录删掉

  hdel myLock UUID:threadId

3 . 删除线程持有锁对应的加锁超时记录

del {myLock}:UUID-1:threadId-1:rwlock_timeout:count+1

分析后,由于我们假设了当前线程持有锁两次,所以只会执行步骤1和步骤3,执行后redis 中锁相关数据如下:

myLock:{
"mode":"read",
"UUID-1:threadId-1":1
}
{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1

到这里,我们可能第一时间会有点疑惑:为什么给读锁扣减不需要先判断锁的模式?

其实在前一篇文章中,我们就提及到两点:

  1. 在锁map中记录加锁次数时,读锁的key是UUID:threadId,即客户端ID:线程ID;而写锁的key是UUID:threadId:write,即客户端ID:线程ID:write,那么就是说读锁的key和写锁的key是不一样的。所以解锁的时候,直接使用对应key来扣减持有锁次数即可。
  2. 还有一点很重要的是,相同线程,如果获取了写锁后,还是可以继续获取读锁的。所以只需要判断锁map有读锁加锁次数记录即可,就可以判断当前线程是持有读锁的,并不需要关心当前锁的模式。

4、分支四:给当前锁刷新过期时间

场景:

  • 当前线程释放一次读锁后,锁map还存在加锁次数记录
  • 有可能是当前线程还持有读锁/写锁,或者当前线程或其他线程同时持有读锁

lua脚本:

"if (redis.call('hlen', KEYS[1]) > 1) then " +
    "local maxRemainTime = -3; " + 
    "local keys = redis.call('hkeys', KEYS[1]); " + 
    "for n, key in ipairs(keys) do " + 
        "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
        "if type(counter) == 'number' then " + 
            "for i=counter, 1, -1 do " + 
                "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
            "end; " + 
        "end; " + 
    "end; " +

    "if maxRemainTime > 0 then " +
        "redis.call('pexpire', KEYS[1], maxRemainTime); " +
        "return 0; " +
    "end;" + 

    "if mode == 'write' then " + 
        "return 0;" + 
    "end; " +
"end; "

分析:

1 . 利用 hlen 获取锁map中key的数量

  hlen myLock

2 . 如果锁map中 key 的数量还是大于1,那么证明还有线程持有锁,遍历锁map集合中的加锁次数key,根据加锁超时记录获取最大的超时时间

 hkeys myLock

a . 拼接 key 对应的加锁记录对应的超时时间,利用 pttl 获取超时时间

b . 与 maxRemainTime 对比,获取当前最大的超时时间,赋值给 maxRemainTime

c . 利用 hget 命令获取key对应的加锁次数

ⅲ . 遍历步骤2获取到的keys

 hget myLock key

ⅳ . 遍历加锁次数

pttl {myLock}:UUID-1:threadId-1:rwlock_timeout:num -> remainTime

 maxRemainTime = math.max(remainTime, maxRemainTime)

ⅵ. 设置 maxRemainTime 为 -3

ⅶ. 利用 hkeys 命令获取锁map中所有key

3 . 判断 maxRemainTime 是否大于0,如果大于0,给锁重新设置过期时间为 maxRemainTime,然后返回0结束lua脚本的执行

pexpire myLock maxRemainTime

这里我们会思考一个问题,为什么锁map中的key都大于1了,证明肯定还有线程持有锁,那为什么还会存在 maxRemainTime 最后小于0的情况呢?

有一个点我们还没学到,那就是其实读写锁中,如果是获取写锁,并不会新增一条写锁的超时记录,因为读写锁中,写锁和写锁是互斥的,写锁和读锁也是互斥的,即使支持当前线程先获取写锁再获取读锁,其实也不需要增加一条写锁的超时时间,因为读写锁 key 的超时时间就等于写锁的超时时间。

4 . 如果当前读写锁的锁模式是写锁,直接返回0结束lua脚本的执行

5、最后操作

场景:

  • 当走到最后的操作,证明当前线程不但成功释放锁,并且释放后当前读写锁已经没有其他线程再持有锁了
  • 所以到这里,我们可以直接将读写锁对应的key直接删掉,并且往读写锁对应的channel中发布释放锁消息

lua脚本:

"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "

分析:

1 . 利用 del 命令删除读写锁对应的 key

 del myLock

2 . 往读写锁对应的channel发布锁释放消息

publish redisson_rwlock:{myLock} 0

二、最后

到此,关于读写锁 RReadWriteLock 中读锁 RedissonReadLock 释放锁的原理已经分析完了,至于释放锁后续的停止 watchdog 的执行等操作,还是和 RedissLock 保持一致,我们就不再分析了。

同时我们可以发现,其实分析读锁时很多地方还是需要同时知道写锁的部分原理,我们这里只是提前透露了一些,用于支撑整个读锁释放锁到原理分析。不过接下来,也将会对写锁 RedissonWriteLock 加锁和释放锁的原理进行分析,在梳理过程中,也会带上一定的场景来分析,尽量做到完全理解为啥 Redisson 要这么做~

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8