继续项目实战,集成Redis分布式锁(大神勿进)

1172次阅读  |  发布于3年以前

本文是我们小项目的第三篇文了,本次我们来把分布式锁应用到我们的项目中,使用Redis实现的分布式锁功能,这一切都是为我们往后的工作做铺垫,希望大家能get到分布式锁这项新技能。

[第一篇:Spring boot项目搭建(前端到数据库,超详细),大神勿进!]

[第二篇:实战 用户登录、session校验、分布式存储session]

上一篇文章中,我们已经把Redis集成到项目中,并且还实现了基于Redis来分布式存储session的方案。

今天我们就来把基于Redis实现的分布式锁,集成到我们的项目中,分布式锁历来都受到大家的关注。不管是工作中、面试中,分布式锁永远是个不老的话题,也希望大家能掌握此技能,便于大家日后能"升官发财"。

分布式锁

背景

为什么要有分布式锁呢?不是已经有synchronized、ReantrantLock等相关锁了吗?

是的,我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,并且可以完美的运行,毫无Bug!

注意:这是单机应用,后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡 :

上图可以看到,变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象),如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的!

如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题!

为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布锁的基本原理

分布式环境下,多台机器上多个进程对同一个共享资源(数据、文件等)进行操作,如果不做互斥,就有可能出现“余额扣成负数”,或者“商品超卖”的情况。

为了解决这个问题,需要分布式锁服务。首先,来看一下分布式锁应该具备哪些条件。

目前市面上,分布式锁的实现方案大致有三种:

  1. 数据库乐观锁;
  2. 基于分布式缓存实现的锁服务,典型代表有 Redis 和基于 Redis 的 RedLock;
  3. 基于分布式一致性算法实现的锁服务,典型代表有 ZooKeeper、Chubby 和 ETCD。

项目中,使用的最多的是后两种,其实每种方案都各有利弊

预防死锁

我们看下面这个典型死锁场景。

一个客户端获取锁成功,但是在释放锁之前崩溃了,此时该客户端实际上已经失去了对公共资源的操作权,但却没有办法请求解锁(删除 Key-Value 键值对),那么,它就会一直持有这个锁,而其它客户端永远无法获得锁。

我们的解决方案是:在加锁时为锁设置过期时间,当过期时间到达,Redis 会自动删除对应的 Key-Value,从而避免死锁。需要注意的是,这个过期时间需要结合具体业务综合评估设置,以保证锁的持有者能够在过期时间之内执行完相关操作并释放锁。

另外,前面已经说了,在实际项目中我们都是使用后两种方案,所以我们重点在后两种方案上。说明 此文章是基于前面我们搞的项目继续开展,同时把Redis已经集成到项目中了,所以此文中分布式锁是基于Redis的实现。

基于Redis实现分布式锁

先说一下使用Redis实现方案的思路:

  1. setnx +expire+delete
  2. setnx+lua
  3. set key value px milliseconds nx

SETNX是『 SET if Not eXists』(如果不存在,则 SET)的简写,设置成功就返回1,否则返回0。

简单版

创建分布锁

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Component
public class DistributedLockV1 {

    private Logger logger = LoggerFactory.getLogger(DistributedLockV1.class);

    @Resource
    private RedisTemplate redisTemplate;

    public boolean lock(String businessKey) {
        boolean result = false;
        String uniqueValue = UUID.randomUUID().toString();
        try {
            //@see <a href="http://redis.io/commands/setnx">Redis Documentation: SETNX</a>
            result = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
            if (!result) {
                return false;
            }
            //设置key的有效期
            redisTemplate.expire(businessKey, 10, TimeUnit.SECONDS);
            return result;
        } catch (Exception ex) {
            logger.error("获取锁失败", ex);
        }
        return result;
    }

    public void unlock(String businessKey) {
        try {
            //delete
            redisTemplate.delete(businessKey);
        } catch (Exception ex) {
            logger.error("释放锁失败", ex);
        }
    }
}

就这样,一个简单的分布式锁就实现了,但是这里会存在问题,问题也不是一定会出现,在特定的时刻还是会出现的。

下面,我们就来把这个分布式锁应用到用户账户余额扣减的功能中。

我们来创建一张用户账户表,表中主要有userId和余额:

CREATE TABLE `user_account` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `user_id` bigint DEFAULT NULL,
  `balance` decimal(10,2) DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

然后创建UserAccountRepository接口。

import com.tian.user.entity.UserAccount;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;

import java.math.BigDecimal;

@Repository
public interface UserAccountRepository extends JpaRepository<UserAccount, Long> {

    UserAccount findByUserId(Long userId);
    //通过userId更新余额
    @Modifying
    @Query("update UserAccount u set u.balance=?1 where  u.userId=?2")
    void updateBalanceByUserId(BigDecimal balance, Long userId);
}

创建UserAccountService和实现类,并实现其扣减方法:

import com.tian.user.entity.UserAccount;

import java.math.BigDecimal;

public interface UserAccountService { 
    /**
     * 扣减余额
     * @param userId 当前用户userId
     * @param balance 当前需要减余额
     * @return 是否扣减成功
     */
    boolean reduceBalance(Long userId, BigDecimal balance);
}
import com.tian.user.entity.UserAccount;
import com.tian.user.lock.DistributedLockV1;
import com.tian.user.repository.UserAccountRepository;
import com.tian.user.service.UserAccountService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Date;

@Service
@Transactional(rollbackFor = Exception.class)
public class UserAccountServiceImpl implements UserAccountService {
    private Logger logger= LoggerFactory.getLogger(getClass());
    @Resource
    private UserAccountRepository userAccountRepository;
    @Resource
    private DistributedLockV1 distributedLockV1; 

    @Override
    public boolean reduceBalance(Long userId, BigDecimal balance) {
        try {
            //把该账户给锁住,使用userId作为key。
            boolean lock = distributedLockV1.lock(userId.toString());
            //获取锁失败,则直接返回扣减失败
            if (!lock) {
                return false;
            }
            UserAccount userAccount = userAccountRepository.findByUserId(userId);
            BigDecimal currBalance = userAccount.getBalance();
            //校验余额是否足够扣减
            if (currBalance.compareTo(balance) > 0) {
                BigDecimal newBalance = currBalance.subtract(balance);
                //扣减余额
                userAccountRepository.updateBalanceByUserId(newBalance, userId);
                return true;
            }
        }catch(Exception ex){
            logger.error("余额扣减失败", ex);
        } finally {
            //释放锁
            distributedLockV1.unlock(userId.toString());
        }
        return false;
    }
}

到此,简单版的分布式锁,以及如何使用,这里就已经搞完了。下面我们来理一下思路:

1.使用setnx(set not exist),就是如何set的这个key在redis不存在就返回true,否则返回false。

2.对已经set的key设置有效期,使用expire设置有效期。

3.校验我们的可用余额是否足够扣减,不够就直接结束并使用delete删除redis中的key。

4.扣减余额,更新数据库余额值。

5.删除key,delete redis中key。

那么,问题来了,第一步、第二步都成功了。但假如第三步查询余额、扣减余额耗时20秒了,上面我们对Redis中key的有效期设置的10秒,也就是超时了,key过期了,并且在10秒到20秒之间又有其他线程来获取到锁了,然后此时把其他线程拿到的锁给删了,把其他线程的锁给解了。此时,不就乱了吗?

这也是面试中常被问使用redis做分布式锁,业务超时了怎么办?

升级版

我们可以把每次key对应的value返回,当释放锁的时候,判断当前key对应的value是否是当前手里持有的value。

然后,我们针对上面的进行修改一版。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Component
public class DistributedLockV1 {
    private Logger logger = LoggerFactory.getLogger(DistributedLockV1.class);
    @Resource
    private RedisTemplate redisTemplate;

    public String lockV2(String businessKey) {
        boolean result = false;
        String uniqueValue = UUID.randomUUID().toString();
        try {
            result = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
            if (!result) {
                return null;
            }
            redisTemplate.expire(businessKey, 100, TimeUnit.SECONDS);
            return uniqueValue;
        } catch (Exception ex) {
            logger.error("获取锁失败", ex);
        }
        return null;
    }

    public void unlockV2(String businessKey, String businessValue) {
        try {
            Object value = redisTemplate.opsForValue().get(businessKey);
            if (value == null) {
                return e;
            }
            //当前key在redis中value和当前线程手里持有的是否一致
            if (!businessValue.equals(value)) {
                //不一致,证明被其他线程获取了
                logger.info("key={}释放锁失败吗,该锁已被其他线程获取",businessKey);
                return ;
            }
            redisTemplate.delete(businessKey);
            logger.info("key={}释放锁成功",businessKey);
        } catch (Exception ex) {
            logger.error("释放锁失败", ex);
        }
    }
}

这里比简单版多了一个判断,判断持有锁的线程是否为当前线程。尽管使用随机字符串的 value来判断是否为当前线程,但是在释放锁时(delete方法),还是无法做到原子操作,比如进程 A 执行完业务逻辑,在准备释放锁时,恰好这时候进程 A 的锁自动过期时间到了,而另一个进程 B 获得锁成功,然后 B 还没来得及执行,进程 A 就执行了 delete(key) ,释放了进程 B 的锁.... ,因此需要配合 Lua 脚本释放锁。

setnx+Lua脚本

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Lua 提供了交互式编程模式。我们可以在命令行中输入程序并立即查看效果。

lua脚本优点:

在resources目录下创建一个redis-lock.lua文件。填入内容:

if redis.call('get', KEYS[1]) == ARGV[1]
    then
        return redis.call('del', KEYS[1])
    else
        return 0
end

这段代码的意思。就是通过key获得其在redis中value,然后使用当前线程手里的value与之对比,一样则删除redis这个key。删除返回1,否则返回0表示什么没做。

Redis锁代码块如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Component
public class DistributedLockV1 {

    private Logger logger = LoggerFactory.getLogger(DistributedLockV1.class);

    @Resource
    private RedisTemplate redisTemplate;

    private DefaultRedisScript<Long> script;

    @PostConstruct
    public void init() {
        script = new DefaultRedisScript<Long>();
        script.setResultType(Long.class);
        script.setScriptSource(new ResourceScriptSource(new ClassPathResource("redis-lock.lua")));
    }

    public String lockV3(String key) {
        String value = UUID.randomUUID().toString().replace("-", "");

        /*
         * setIfAbsent <=> SET key value [NX] [XX] [EX <seconds>] [PX [millseconds]]
         * set expire time 5 mins
         */
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10000, TimeUnit.MILLISECONDS);
        if (flag) {
            return value;
        }
        return null;
    }

    public void unlockV3(String key, String value) {
        /** 业务逻辑处理完毕,释放锁 **/
        String lockValue = (String) redisTemplate.opsForValue().get(key);
        if (lockValue != null && lockValue.equals(value)) {
            System.out.println("lockValue========:" + lockValue);
            List<String> keys = new ArrayList<>();
            keys.add(key);
            Object execute = redisTemplate.execute(script, keys, lockValue);
            System.out.println("execute执行结果,1表示执行del,0表示未执行 ===== " + execute);
            logger.info("{} 解锁成功,结束处理业务", key);
            return;
        }
        logger.info("key={}释放锁失败", key);
    }
}

最后我们再次执行罚款扣减,日志输出:

lockValue========:199740e62c184a6a9897f9c95e720b4d
execute执行结果,1表示执行del,0表示未执行 ===== 1
2021-03-09 19:03:51.592  INFO 6692 --- [nio-8080-exec-4] com.tian.user.lock.DistributedLockV1     : 1 解锁成功,结束处理业务

到此,setnx+Lua 这种方案我们已经实现了。如果对此有怀疑的,是好事,建议创建多个线程去调用罚款扣减这个service方法,看看器是否会出现问题。

「注意」

setnx在redis较低的版本里是没有的,后面才引入的。其实我们也可以使用set命令来解决setnx,另外还可以加过期时间,整体命令为

set key value nx px xxx

value 最好是随机字符串,这样可以防止业务代码执行时间超过设置的锁自动过期时间,而导致再次释放锁时出现释放其他进程锁的情况。

setnx 琐最大的缺点就是它加锁时只作用在一个 Redis 节点上,即使 Redis 通过 Sentinel(哨岗、哨兵) 保证高可用,如果这个 master 节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况,下面是个例子:

  1. 在 Redis 的 master 节点上拿到了锁;
  2. 但是这个加锁的 key 还没有同步到 slave 节点;
  3. master 故障,发生故障转移,slave 节点升级为 master节点;
  4. 上边 master 节点上的锁丢失。

有的时候甚至不单单是锁丢失这么简单,新选出来的 master 节点可以重新获取同样的锁,出现一把锁被拿两次的场景。

由此可知,锁被获取两次,肯定不能满足安全性了。

尽管前两种方案不是很如意,总是有些问题,但也有被部分企业采用,下面我们就来看基于Redis来实现分布式锁更高级的版本。

高级版 Redisson + RedLock

Redisson 是 java 的 Redis 客户端之一,是 Redis 官网推荐的 java 语言实现分布式锁的项目。

Redisson 提供了一些 api 方便操作 Redis。因为本文主要以锁为主,所以接下来我们主要关注锁相关的类,以下是 Redisson 中提供的多样化的锁:

总之,管你了解不了解,反正 Redisson 就是提供了一堆锁... 也是目前大部分公司使用 Redis 分布式锁最常用的一种方式。

整体加锁和解锁的代码结构如下:

RLock lock = redissonClient.getLock("xxx");

lock.lock();

try {
    ...
} finally {
    lock.unlock();
}

其实,加锁和解锁的磁层也是使用Lua脚本来实现的,有兴趣的朋友可以去翻看一下器底层源码。

由于篇幅问题,还涉及到Redis集群,所以这里就给出加锁和解锁的流程图,仅供参考。

「加锁过程」

「解锁过程」

总结

通常我们为了实现 Redis 的高可用,一般都会搭建 Redis 的集群模式,比如给 Redis 节点挂载一个或多个 slave 从节点,然后采用哨兵模式进行主从切换。但由于 Redis 的主从模式是异步的,所以可能会在数据同步过程中,master 主节点宕机,slave 从节点来不及数据同步就被选举为 master 主节点,从而导致数据丢失,大致过程如下:

  1. 用户在 Redis 的 master 主节点上获取了锁;
  2. master 主节点宕机了,存储锁的 key 还没有来得及同步到 slave 从节点上;
  3. slave 从节点升级为 master 主节点;
  4. 用户从新的 master 主节点获取到了对应同一个资源的锁,同把锁获取两次。

ok,然后为了解决这个问题,Redis 作者提出了 RedLock 算法,步骤如下(五步):

在下面的示例中,我们假设有 5 个完全独立的 Redis Master 节点,他们分别运行在 5 台服务器中,可以保证他们不会同时宕机。

  1. 获取当前 Unix 时间,以毫秒为单位。
  2. 依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。
  3. 客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  4. 如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
  5. 如果因为某些原因,获取锁失败(没有在至少 N/2+1 个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。

到这,基本看出来,只要是大多数的 Redis 节点可以正常工作,就可以保证 Redlock 的正常工作。这样就可以解决前面单点 Redis 的情况下我们讨论的节点挂掉,由于异步通信,导致锁失效的问题。

但是细想后, Redlock 还是存在如下问题:

假设一共有5个Redis节点:A, B, C, D, E。设想发生了如下的事件序列:

  1. 客户端1 成功锁住了 A, B, C,获取锁成功(但 D 和 E 没有锁住)。
  2. 节点 C 崩溃重启了,但客户端1在 C 上加的锁没有持久化下来,丢失了。
  3. 节点 C 重启后,客户端2 锁住了 C, D, E,获取锁成功。
  4. 这样,客户端1 和 客户端2 同时获得了锁(针对同一资源)。

哎,还是不能解决故障重启后带来的锁的安全性问题...

针对节点重后引发的锁失效问题,Redis 作者又提出了 延迟重启 的概念,大致就是说,一个节点崩溃后,不要立刻重启他,而是等到一定的时间后再重启,等待的时间应该大于锁的过期时间,采用这种方式,就可以保证这个节点在重启前所参与的锁都过期,听上去感觉 延迟重启 解决了这个问题...

但是,还是有个问题,节点重启后,在等待的时间内,这个节点对外是不工作的。那么如果大多数节点都挂了,进入了等待,就会导致系统的不可用,因为系统在过期时间内任何锁都无法加锁成功。

总之,在 Redis 分布式锁的实现上还有很多问题等待解决,我们需要认识到这些问题并清楚如何正确实现一个 Redis 分布式锁,然后在工作中合理的选择和正确的使用分布式锁。

但为什么又说,有一部分企业采用Redis来实现分布式锁呢?其实实现分布式锁,从中间件上来选,也有 Zookeeper 可选,并且 Zookeeper 可靠性比 Redis 强太多,但是效率是低了点,如果并发量不是特别大,追求可靠性,那么肯定首选 Zookeeper。

关于分布式锁的实现方案,没有绝对的好与坏,没有最好的方案,只有最适合你的业务的方案。

以下两种方案仅供参考:

「彩蛋」更深度的可以看看redis 作者写的redlock算法的文章,以及一篇反对的文章。这两篇文章非常有趣,感兴趣的朋友可以网上找找这两篇文章,然后拜读一番。

参考

http://ii081.cn/847dl http://ii081.cn/BNYBm

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8