上一篇已经简单介绍了,redisson 提供的读写锁 RReadWriteLock 的使用demo、使用场景、和RedissonLock 的关系;也深入分析了读锁 RedissonReadLock 加锁 lua 脚本的执行逻辑、watchdog 机制 lua 脚本的执行逻辑。
下面,我们将继续分析读锁 RedissonReadLock 释放锁时,lua 脚本是怎么执行的。
分析前,我们定一下加锁的key:
RReadWriteLock readWriteLock = client.getReadWriteLock("myLock");
RedissonReadLock#unlockInnerAsync:
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
}
我们可以看到,这读锁释放锁的lua脚本还是比较长的,但是我们也不用着急,一步一步分析就可以了。
Arrays.
KEYS:["myLock","redisson_rwlock:{myLock}","{myLock}:UUID-1:threadId-1:rwlock_timeout","{myLock}"]
LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)
ARGVS:[0L,"UUID:threadId"]
场景:
lua脚本:
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; "
分析:
1 . 利用 hget 命令获取读写锁的模式
hget myLock mode
2 . 如果锁模式为空,往读写锁对应的channel发送释放锁的消息,然后返回1,lua脚本执行完毕
publish redisson_rwlock:{myLock} 0
channel 发布锁释放消息的用处:
场景:
lua脚本:
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; "
分析:
1 . 利用 hexists 命令判断当前线程是否持有锁
hexists myLock UUID-1:threadId-1
2 . 如果不存在直接返回null,表示释放锁失败
场景:
lua脚本:
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); "
分析: 分析前,我们假设线程 UUID-1:threadId-1 持有两个读锁,那么 redis 中锁相关数据如下:
myLock:{
"mode":"read",
"UUID-1:threadId-1":2
}
{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
{myLock}:UUID-1:threadId-1:rwlock_timeout:2 1
1 . 利用 hincrby 命令,给当前线程持有锁数量减1
hincrby myLock UUID-1:threadId-1 -1
2 . 如果持有锁数量减1后等于0,证明当前线程不再持有锁,那么利用 hdel 命令将锁map中加锁次数记录删掉
hdel myLock UUID:threadId
3 . 删除线程持有锁对应的加锁超时记录
del {myLock}:UUID-1:threadId-1:rwlock_timeout:count+1
分析后,由于我们假设了当前线程持有锁两次,所以只会执行步骤1和步骤3,执行后redis 中锁相关数据如下:
myLock:{
"mode":"read",
"UUID-1:threadId-1":1
}
{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
到这里,我们可能第一时间会有点疑惑:为什么给读锁扣减不需要先判断锁的模式?
其实在前一篇文章中,我们就提及到两点:
场景:
lua脚本:
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; "
分析:
1 . 利用 hlen 获取锁map中key的数量
hlen myLock
2 . 如果锁map中 key 的数量还是大于1,那么证明还有线程持有锁,遍历锁map集合中的加锁次数key,根据加锁超时记录获取最大的超时时间
hkeys myLock
a . 拼接 key 对应的加锁记录对应的超时时间,利用 pttl 获取超时时间
b . 与 maxRemainTime 对比,获取当前最大的超时时间,赋值给 maxRemainTime
c . 利用 hget 命令获取key对应的加锁次数
ⅲ . 遍历步骤2获取到的keys
hget myLock key
ⅳ . 遍历加锁次数
pttl {myLock}:UUID-1:threadId-1:rwlock_timeout:num -> remainTime
maxRemainTime = math.max(remainTime, maxRemainTime)
ⅵ. 设置 maxRemainTime 为 -3
ⅶ. 利用 hkeys 命令获取锁map中所有key
3 . 判断 maxRemainTime 是否大于0,如果大于0,给锁重新设置过期时间为 maxRemainTime,然后返回0结束lua脚本的执行
pexpire myLock maxRemainTime
这里我们会思考一个问题,为什么锁map中的key都大于1了,证明肯定还有线程持有锁,那为什么还会存在 maxRemainTime 最后小于0的情况呢?
有一个点我们还没学到,那就是其实读写锁中,如果是获取写锁,并不会新增一条写锁的超时记录,因为读写锁中,写锁和写锁是互斥的,写锁和读锁也是互斥的,即使支持当前线程先获取写锁再获取读锁,其实也不需要增加一条写锁的超时时间,因为读写锁 key 的超时时间就等于写锁的超时时间。
4 . 如果当前读写锁的锁模式是写锁,直接返回0结束lua脚本的执行
场景:
lua脚本:
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "
分析:
1 . 利用 del 命令删除读写锁对应的 key
del myLock
2 . 往读写锁对应的channel发布锁释放消息
publish redisson_rwlock:{myLock} 0
到此,关于读写锁 RReadWriteLock 中读锁 RedissonReadLock 释放锁的原理已经分析完了,至于释放锁后续的停止 watchdog 的执行等操作,还是和 RedissLock 保持一致,我们就不再分析了。
同时我们可以发现,其实分析读锁时很多地方还是需要同时知道写锁的部分原理,我们这里只是提前透露了一些,用于支撑整个读锁释放锁到原理分析。不过接下来,也将会对写锁 RedissonWriteLock 加锁和释放锁的原理进行分析,在梳理过程中,也会带上一定的场景来分析,尽量做到完全理解为啥 Redisson 要这么做~
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8