Redisson分布式锁学习总结:读锁 RedissonReadLock#lock 获取锁源码分析

1143次阅读  |  发布于3年以前

1、RReadWriteLock 使用&读写锁介绍

首先,我们先简单上个简单使用 RReadWriteLock 的 demo:

public class RedissonReadWriteLockDemo {

    public static void main(String[] args) {
        RedissonClient client = RedissonClientUtil.getClient("");
        RReadWriteLock readWriteLock = client.getReadWriteLock("myLock");
        RLock readLock = readWriteLock.readLock();
        RLock writeLock = readWriteLock.writeLock();

        // 场景一:先读锁,后写锁
        readLock.lock();
        writeLock.lock();
        readLock.unlock();
        writeLock.unlock();

        // 场景二:重复获取读锁
        readLock.lock();
        readLock.lock();
        readLock.unlock();
        readLock.unlock();

        // 场景三:先写锁,后读锁
        writeLock.lock();
        readLock.lock();
        writeLock.unlock();
        readLock.lock();

        // 场景四:先写锁,再读锁
        writeLock.lock();
        writeLock.lock();
        writeLock.unlock();
        writeLock.unlock();
    }
}

读写锁作用:

使用场景: 读写锁主要作用就是做到保证数据的一致性。例如有人在读数据的时候,就应该避免数据被修改,从而保证数据读取前后的一致性,其他的情况一样,例如写读,写写;但是如果是读读就没有必要做到互斥了,因为此时数据不会被修改,多少个人过来读都是一样的,不会影响到数据的一致性。

但是,在平时开发中,读写锁是基本用不上的,因为MySQL自带的 MVCC 机制和锁机制就能保证了数据的原子性、一致性、隔离性和持久性。也就是我们说的ACID。而我们普通的程序或者系统,无非就是对数据库的表数据进行增删改查。

如果我们在程序中自己维护了一套元数据,例如map集合,并且我们的程序是分布式系统,那么就可以利用Redisson提供的分布式读写锁来保证元数据的一致性了。

那么下面我们将分析 redisson 提供的 RReadWriteLock 读写锁是如何做到分布式读写控制的,并且证实一下 RReadWriteLock 提供的作用是否和上面所说的一致,还是说会有点出入。

2、RReadWriteLock 和 RedissonLock 的关系

从上面文章我们也知道,Redisson 提供的公平锁是基于 RedissonLock 做的扩展;但其实不但是公平锁,Redisson 提供的读写锁也是基于 RedissonLcok 上做的扩展:

所以接下来,我们只需要分析读锁和写锁的加锁逻辑和释放锁逻辑;但还有读锁的 wathcdog lua 脚本也是要分析的。因为读锁 watchdog 机制虽然和 RedissonLock 保持的是一致的,但是执行的 lua 脚本的方法却是重写过的。

3、RedissonReadLock 之 lua 脚本加锁

RedissonReadLock#tryLockInnerAsync:

@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            "if (mode == false) then " +
                              "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                              "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                              "redis.call('set', KEYS[2] .. ':1', 1); " +
                              "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                            "end; " +
                            "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                              "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                              "local key = KEYS[2] .. ':' .. ind;" +
                              "redis.call('set', key, 1); " +
                              "redis.call('pexpire', key, ARGV[1]); " +
                              "local remainTime = redis.call('pttl', KEYS[1]); " +
                              "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                              "return nil; " +
                            "end;" +
                            "return redis.call('pttl', KEYS[1]);",
                    Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                    internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}

lua 脚本不长,我们一步一步分析。

3.1、KEYS

Arrays.asList(getName(), getReadWriteTimeoutNamePrefix(threadId))

  • getName(): 锁key
  • getReadWriteTimeoutNamePrefix(threadId):suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout"
  • getName():锁key
  • getLockName(threadId):id:threadId -> 客户端UUID:线程ID -> 假设 UUID-1:threadId-1
  • suffixName():{锁key}:UUID:threadId

KEYS:["myLock","{myLock}:UUID-1:threadId-1:rwlock_timeout"]

3.2、ARGVS

internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId))

  • internalLockLeaseTime:其实就是 watchdog 的超时时间,默认是30000毫秒,可看 Config#lockWatchdogTimeout。

    private long lockWatchdogTimeout = 30 * 1000;
    
  • getLockName(threadId):return id + ":" + threadId,客户端ID(UUID):线程ID(threadId)

  • getWriteLockName(threadId)):super.getLockName(threadId) + ":write"

ARGVS:[30_000毫秒,"UUID-1:threadId-1","UUID-1:threadId-1:write"]

3.3、lua 脚本分析

1、第一步,获取锁模式

lua脚本

local mode = redis.call('hget', KEYS[1], 'mode');

分析:

  1. 利用 hget 命令获取当前锁模式
  hget myLock mode

2、分支一:锁的模式为空,即当前锁尚未有线程获取

场景:

  • 当前线程尝试获取读锁时,还没有其他线程成功获取锁,不管是读锁还是写锁

lua脚本:

"if (mode == false) then " +
  "redis.call('hset', KEYS[1], 'mode', 'read'); " +
  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  "redis.call('set', KEYS[2] .. ':1', 1); " +
  "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +

分析:

  1. 利用 hset 命令设置锁模式为读锁
  hset myLock mode read

执行后,锁内容如下:

  myLock:{
    "mode":"read"
}
  1. 利用 hset 命令为当前线程添加加锁次数记录
hset myLock UUID-1:threadId-1 1

执行后,锁的内容如下:

  myLock:{
    "mode":"read",
    "UUID-1:threadId-1":1
}
  1. 利用 set 命令为当前线程添加一条加锁超时记录
  set {myLock}:UUID-1:threadId-1:rwlock_timeout:1 1

执行后,redis里读锁的相关数据:

  myLock:{
    "mode":"read",
    "UUID-1:threadId-1":1
}{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
  1. 利用 pexpire 命令为锁&当前线程超时记录添加过期时间
  pexpire {myLock}:UUID-1:threadId-1:rwlock_timeout:1 30000
pexpire myLock 30000

设置锁,还是之前提到的那个点,避免出现死锁的情况。 5. 最后返回nil,表示获取锁成功

2、分支二:锁模式为读锁或锁模式为写锁并且获取写锁为当前线程,当前线程可再次获取读锁

场景:

  • 锁模式为读锁,当前线程可获取读锁。即:redisson提供的读写锁支持不同线程重复获取锁

  • 锁模式为写锁,并且获取写锁的线程为当前线程,当前线程可获取读锁。即:redisson 提供的读写锁,读写并不是完全互斥,而是支持同一线程先获取写锁再获取读锁。

    关于写锁判断,我们只能到分析获取写锁的lua脚本时再回头看看了;但是我们可以从这里提前知道,如果为写锁添加加锁次数记录,使用的 key 是 UUID-1:threadId-1:write,而读锁使用的 key 是 UUID-1:threadId-1

lua脚本:

"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
  "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
  "local key = KEYS[2] .. ':' .. ind;" +
  "redis.call('set', key, 1); " +
  "redis.call('pexpire', key, ARGV[1]); " +
  "local remainTime = redis.call('pttl', KEYS[1]); " +
  "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
  "return nil; " +
"end;"

分析:

1 . 利用 hincrby 命令为当前线程增加加锁次数

 hincrby myLock UUID-1:threadId-1 1

假设当前线程之前获取过1次锁(假设是读锁),执行后锁内容如下:

  myLock:{
    "mode":"read",
    "UUID-1:threadId-1":2
}

2 . 为当前线程拼接加锁记录 key,然后利用 set 命令添加一条加锁超时记录

 key = {myLock}:UUID-1:threadId-1:rwlock_timeout:2
set {myLock}:UUID-1:threadId-1:rwlock_timeout:2 1

执行后,redis里读锁的相关数据:

  myLock:{
    "mode":"read",
    "UUID-1:threadId-1":2
}

{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
{myLock}:UUID-1:threadId-1:rwlock_timeout:2 1

3 . 给新的加锁超时记录设置过期时间

pexpire {myLock}:UUID-1:threadId-1:rwlock_timeout:2 30000

4 . 最后,pttl 命令获取锁的过期时间,利用 pexipre 给锁重新设置锁的过期时间

  pttl myLock
pttl 和 30000 中选出最大值
pexpire myLock 最大值

5 . 最后返回nil,表示获取锁成功

3、分支三:获取锁失败,返回锁pttl

场景:

  • 不满足上面的两个分支,当前线程就无法成功获取读锁

lua脚本:

"return redis.call('pttl', KEYS[1]);"

分析:

1 . 利用 pttl 命令获取锁过期时间(毫秒)

pttl myLock

2 . 直接返回步骤1的获取到的毫秒数

4、watchdog 续命逻辑

我们上面提到,虽然 RReadWriteLock 还是延用 RedissonLock watchdog 的机制,但是读锁 对于 watchdog 执行 lua 脚本的方法是进行了重写的,所以我们还是需要分析读锁关于 watchdog lua 脚本的执行逻辑,而后面的写锁就不再需要关注了。

4.1、RedissonReadLock 之 watchdog lua 脚本

RedissonReadLock#renewExpirationAsync:

@Override
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
            "if (counter ~= false) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +

                "if (redis.call('hlen', KEYS[1]) > 1) then " +
                    "local keys = redis.call('hkeys', KEYS[1]); " + 
                    "for n, key in ipairs(keys) do " + 
                        "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                        "if type(counter) == 'number' then " + 
                            "for i=counter, 1, -1 do " + 
                                "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + 
                            "end; " + 
                        "end; " + 
                    "end; " +
                "end; " +

                "return 1; " +
            "end; " +
            "return 0;",
        Arrays.<Object>asList(getName(), keyPrefix), 
        internalLockLeaseTime, getLockName(threadId));
}

lua 脚本不长,我们一步一步分析。

4.1、KEYS

Arrays.asList(getName(), keyPrefix)

  • getName(): 锁key
  • keyPrefix:getKeyPrefix(threadId, timeoutPrefix)
  • timeoutPrefix:getReadWriteTimeoutNamePrefix(threadId) -> {myLock}:UUID-1:threadId-1:rwlock_timeout
  • getKeyPrefix(threadId, timeoutPrefix):timeoutPrefix.split(":" + getLockName(threadId))[0]; -> {myLock}:UUID-1:threadId-1:rwlock_timeout.split(":UUID-1")[0] -> {myLock}

KEYS:["myLock","{myLock}"]

4.2、ARGVS

internalLockLeaseTime, getLockName(threadId)

  • internalLockLeaseTime:其实就是 watchdog 的超时时间,默认是30000毫秒,可看 Config#lockWatchdogTimeout。

    private long lockWatchdogTimeout = 30 * 1000;
    
  • getLockName(threadId):return id + ":" + threadId,客户端ID(UUID):线程ID(threadId)

ARGVS:[30_000毫秒,"UUID-1:threadId-1"]

4.3、lua 脚本分析

1、第一步,获取当前线程的加锁次数

lua脚本

local counter = redis.call('hget', KEYS[1], ARGV[2]);

分析:

1 . 利用 hget 命令获取当前当前线程的加锁次数

  hget myLock UUID-1:threadId-1

2、分支一:当前线程获取锁次数大于0,刷新锁过期时间;如果锁集合元素大于1,刷新里面加锁线程的过期时间

lua脚本:

"if (counter ~= false) then " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +

    "if (redis.call('hlen', KEYS[1]) > 1) then " +
        "local keys = redis.call('hkeys', KEYS[1]); " + 
        "for n, key in ipairs(keys) do " + 
            "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
            "if type(counter) == 'number' then " + 
                "for i=counter, 1, -1 do " + 
                    "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + 
                "end; " + 
            "end; " + 
        "end; " +
    "end; " +

    "return 1; " +
"end; " +

分析:

1 . 利用 pexpire 命令重新刷新锁过期时间

  pexpire myLock 30000

2 . 利用 hlen 命令获取锁集合里面的元素个数,然后判断是否大于1个以上key

 hlen myLock

做这个判断是因为,如果是可重入锁或者公平锁,锁集合里面只有有一个key,就是当前成功获取锁的线程。但如果是读写锁,他里面可包含2个以上key,其中一个就是锁的模式,即之前提到的 mode 字段,可表示当前锁是读锁还是写锁。

3 . 如果锁集合里面大于1个key,需为每个加锁线程刷新过期时间

 hkeys myLock
  hget myLock key

每加锁一次

  pexpire {myLock}:UUID-1:threadId-1:rwlock:timeout:value 30000

之所以遍历加锁次数,看完上面其实也清楚了。因为每成功加锁一次,redisson 都会为当前线程新增一条加锁记录,并且设置过期时间。所以其实步骤3整体就是为了给所有加锁记录刷新过期时间。

  • 如果key的值为数字,证明此key是加锁成功的线程,并且value的值表示线程加锁次数;我们需要遍历加锁次数我们利用 pexpire 为这个线程对应的加锁记录刷新过期时间

  • 遍历所有key,并利用 hget 命令来获取key对应的value

  • 利用 hkeys 命令获取锁集合里面所有key

4 . 如果步骤3不满足,直接返回1

到此,文章已经介绍了 RedissonReadLock 是如何获取锁的,也分析了 wathchdog 为锁续命的逻辑。关于 RedissonReadLock 如何释放锁,以及 RedissonWriteLock 如何获取锁和释放锁,将继续按照之前的风格,分别写上三篇文章。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8