首先,我们先简单上个简单使用 RReadWriteLock 的 demo:
public class RedissonReadWriteLockDemo {
public static void main(String[] args) {
RedissonClient client = RedissonClientUtil.getClient("");
RReadWriteLock readWriteLock = client.getReadWriteLock("myLock");
RLock readLock = readWriteLock.readLock();
RLock writeLock = readWriteLock.writeLock();
// 场景一:先读锁,后写锁
readLock.lock();
writeLock.lock();
readLock.unlock();
writeLock.unlock();
// 场景二:重复获取读锁
readLock.lock();
readLock.lock();
readLock.unlock();
readLock.unlock();
// 场景三:先写锁,后读锁
writeLock.lock();
readLock.lock();
writeLock.unlock();
readLock.lock();
// 场景四:先写锁,再读锁
writeLock.lock();
writeLock.lock();
writeLock.unlock();
writeLock.unlock();
}
}
读写锁作用:
使用场景: 读写锁主要作用就是做到保证数据的一致性。例如有人在读数据的时候,就应该避免数据被修改,从而保证数据读取前后的一致性,其他的情况一样,例如写读,写写;但是如果是读读就没有必要做到互斥了,因为此时数据不会被修改,多少个人过来读都是一样的,不会影响到数据的一致性。
但是,在平时开发中,读写锁是基本用不上的,因为MySQL自带的 MVCC 机制和锁机制就能保证了数据的原子性、一致性、隔离性和持久性。也就是我们说的ACID。而我们普通的程序或者系统,无非就是对数据库的表数据进行增删改查。
如果我们在程序中自己维护了一套元数据,例如map集合,并且我们的程序是分布式系统,那么就可以利用Redisson提供的分布式读写锁来保证元数据的一致性了。
那么下面我们将分析 redisson 提供的 RReadWriteLock 读写锁是如何做到分布式读写控制的,并且证实一下 RReadWriteLock 提供的作用是否和上面所说的一致,还是说会有点出入。
从上面文章我们也知道,Redisson 提供的公平锁是基于 RedissonLock 做的扩展;但其实不但是公平锁,Redisson 提供的读写锁也是基于 RedissonLcok 上做的扩展:
所以接下来,我们只需要分析读锁和写锁的加锁逻辑和释放锁逻辑;但还有读锁的 wathcdog lua 脚本也是要分析的。因为读锁 watchdog 机制虽然和 RedissonLock 保持的是一致的,但是执行的 lua 脚本的方法却是重写过的。
RedissonReadLock#tryLockInnerAsync:
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"local remainTime = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
lua 脚本不长,我们一步一步分析。
Arrays.
KEYS:["myLock","{myLock}:UUID-1:threadId-1:rwlock_timeout"]
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId))
internalLockLeaseTime:其实就是 watchdog 的超时时间,默认是30000毫秒,可看 Config#lockWatchdogTimeout。
private long lockWatchdogTimeout = 30 * 1000;
getLockName(threadId):return id + ":" + threadId,客户端ID(UUID):线程ID(threadId)
getWriteLockName(threadId)):super.getLockName(threadId) + ":write"
ARGVS:[30_000毫秒,"UUID-1:threadId-1","UUID-1:threadId-1:write"]
lua脚本
local mode = redis.call('hget', KEYS[1], 'mode');
分析:
hget myLock mode
场景:
lua脚本:
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
分析:
hset myLock mode read
执行后,锁内容如下:
myLock:{
"mode":"read"
}
hset myLock UUID-1:threadId-1 1
执行后,锁的内容如下:
myLock:{
"mode":"read",
"UUID-1:threadId-1":1
}
set {myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
执行后,redis里读锁的相关数据:
myLock:{
"mode":"read",
"UUID-1:threadId-1":1
}{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
pexpire {myLock}:UUID-1:threadId-1:rwlock_timeout:1 30000
pexpire myLock 30000
设置锁,还是之前提到的那个点,避免出现死锁的情况。 5. 最后返回nil,表示获取锁成功
场景:
锁模式为读锁,当前线程可获取读锁。即:redisson提供的读写锁支持不同线程重复获取锁
锁模式为写锁,并且获取写锁的线程为当前线程,当前线程可获取读锁。即:redisson 提供的读写锁,读写并不是完全互斥,而是支持同一线程先获取写锁再获取读锁。
关于写锁判断,我们只能到分析获取写锁的lua脚本时再回头看看了;但是我们可以从这里提前知道,如果为写锁添加加锁次数记录,使用的 key 是 UUID-1:threadId-1:write,而读锁使用的 key 是 UUID-1:threadId-1
lua脚本:
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"local remainTime = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;"
分析:
1 . 利用 hincrby 命令为当前线程增加加锁次数
hincrby myLock UUID-1:threadId-1 1
假设当前线程之前获取过1次锁(假设是读锁),执行后锁内容如下:
myLock:{
"mode":"read",
"UUID-1:threadId-1":2
}
2 . 为当前线程拼接加锁记录 key,然后利用 set 命令添加一条加锁超时记录
key = {myLock}:UUID-1:threadId-1:rwlock_timeout:2
set {myLock}:UUID-1:threadId-1:rwlock_timeout:2 1
执行后,redis里读锁的相关数据:
myLock:{
"mode":"read",
"UUID-1:threadId-1":2
}
{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
{myLock}:UUID-1:threadId-1:rwlock_timeout:2 1
3 . 给新的加锁超时记录设置过期时间
pexpire {myLock}:UUID-1:threadId-1:rwlock_timeout:2 30000
4 . 最后,pttl 命令获取锁的过期时间,利用 pexipre 给锁重新设置锁的过期时间
pttl myLock
pttl 和 30000 中选出最大值
pexpire myLock 最大值
5 . 最后返回nil,表示获取锁成功
场景:
lua脚本:
"return redis.call('pttl', KEYS[1]);"
分析:
1 . 利用 pttl 命令获取锁过期时间(毫秒)
pttl myLock
2 . 直接返回步骤1的获取到的毫秒数
我们上面提到,虽然 RReadWriteLock 还是延用 RedissonLock watchdog 的机制,但是读锁 对于 watchdog 执行 lua 脚本的方法是进行了重写的,所以我们还是需要分析读锁关于 watchdog lua 脚本的执行逻辑,而后面的写锁就不再需要关注了。
RedissonReadLock#renewExpirationAsync:
@Override
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
"if (counter ~= false) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"return 1; " +
"end; " +
"return 0;",
Arrays.<Object>asList(getName(), keyPrefix),
internalLockLeaseTime, getLockName(threadId));
}
lua 脚本不长,我们一步一步分析。
Arrays.
KEYS:["myLock","{myLock}"]
internalLockLeaseTime, getLockName(threadId)
internalLockLeaseTime:其实就是 watchdog 的超时时间,默认是30000毫秒,可看 Config#lockWatchdogTimeout。
private long lockWatchdogTimeout = 30 * 1000;
getLockName(threadId):return id + ":" + threadId,客户端ID(UUID):线程ID(threadId)
ARGVS:[30_000毫秒,"UUID-1:threadId-1"]
lua脚本
local counter = redis.call('hget', KEYS[1], ARGV[2]);
分析:
1 . 利用 hget 命令获取当前当前线程的加锁次数
hget myLock UUID-1:threadId-1
lua脚本:
"if (counter ~= false) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"return 1; " +
"end; " +
分析:
1 . 利用 pexpire 命令重新刷新锁过期时间
pexpire myLock 30000
2 . 利用 hlen 命令获取锁集合里面的元素个数,然后判断是否大于1个以上key
hlen myLock
做这个判断是因为,如果是可重入锁或者公平锁,锁集合里面只有有一个key,就是当前成功获取锁的线程。但如果是读写锁,他里面可包含2个以上key,其中一个就是锁的模式,即之前提到的 mode 字段,可表示当前锁是读锁还是写锁。
3 . 如果锁集合里面大于1个key,需为每个加锁线程刷新过期时间
hkeys myLock
hget myLock key
每加锁一次
pexpire {myLock}:UUID-1:threadId-1:rwlock:timeout:value 30000
之所以遍历加锁次数,看完上面其实也清楚了。因为每成功加锁一次,redisson 都会为当前线程新增一条加锁记录,并且设置过期时间。所以其实步骤3整体就是为了给所有加锁记录刷新过期时间。
如果key的值为数字,证明此key是加锁成功的线程,并且value的值表示线程加锁次数;我们需要遍历加锁次数我们利用 pexpire 为这个线程对应的加锁记录刷新过期时间
遍历所有key,并利用 hget 命令来获取key对应的value
利用 hkeys 命令获取锁集合里面所有key
4 . 如果步骤3不满足,直接返回1
到此,文章已经介绍了 RedissonReadLock 是如何获取锁的,也分析了 wathchdog 为锁续命的逻辑。关于 RedissonReadLock 如何释放锁,以及 RedissonWriteLock 如何获取锁和释放锁,将继续按照之前的风格,分别写上三篇文章。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8