Redis事务&RedisTemplate事务使用,看这一篇就够了!

476次阅读  |  发布于2年以前

在得物技术体系中,大量使用Redis作为缓存中间件,以应对高并发下的大流量场景。在使用缓存时,不得不考虑数据一致性问题,即保证缓存中的数据和DB始终可以保持一致。常规的解决缓存一致性的方案一般为先修改DB并提交事物,再操作缓存更新或者失效,为了应对极端场景往往会再采用延迟操作的方式进行缓存的二次处理。

但实际开发中,遇到很多代码不规范的场景,在JDBC事务中进行缓存删除或者更新等操作,带来的问题是当JDBC事务未提交就完成了Redis的操作,容易造成二者数据不一致。所以我们思考:既然Redis本身也提供了事务的解决方案,那能不能将Redis事务和DB的事务进行结合,来保证数据一致性的问题呢?接下来我们就带着这个问题看看看Redis事务的实现、使用,最终探索一下将Redis操作结合DB事务使用的可能性。

一、Redis事务实现

Redis事务涉及命令MULTI、EXEC、WATCH、DISCARD、UNWATCH,命令含义如下:

命令简单,不展开讲,主要看整个事务实现过程,从RedisServer源码入手,看以下几个重点的结构定义:

/* Global server state structure */
struct redisServer {
    ... //省略所有不关心的属性

    list *clients; //客户端链表
};

RedisServer的全局结构,省略所有不关心的属性,重点看clients属性(双向列表),实际存储RedisClient结构,当客户端到服务端连接建立时则会创建RedisClient结构(networking.c),继续看RedisClient结构定义。

// 每个Connection对应一个RedisClient结构
typedef struct redisClient {
    //事务状态
    multiState mstate;
} redisClient;

从RedisClient的定义可以看出,主要存储事务状态信息(意味着事务状态主要记录在Server端),至于事务状态信息具体什么结构,往下看。

// Redis命令Queue
typedef struct multiState {
    //事务队列,FIFO
    multiCmd *commands;
    //已入队命令计数器
    int count;
} multiState;

MultiState实际是一个FIFO的队列,继续看下MultiCmd定义。

// 命令节点:记录加入队列命令信息
typedef struct multiCmd {
    //参数
    robj **argv;
    //参数数量
    int argc;
    //命令指针
    struct redisCommand *cmd;
} multiCmd;

MultiCmd就是记录每一个命令的节点。

从上面一连串数据结构定义基本可以看出RedisServer对Multi事务的实现本质是一个FIFO的命令收集队列,等待执行——事务提交(当然也可以选择放弃)。

由于Redis事务采用的是命令收集方式,这种方式并不是友好的事务实现,或者说不是真实事务场景(类比JDBC事务,立即发生,采用redo和undo日志保证一致性),是一种滞后的行为。

二、Redis事务的问题

2.1 不支持读写混合逻辑

命令收集意味着命令并不是立即执行,而是加入队列,你拿不到命令执行的结果,当你需要根据某个已有值(读取)进行逻辑区分时,此场景不可实现。实例代码如下:

// 读写混合逻辑
String evaluate_value = redisTemplate.opsForValue().get("key"); // read
if ("condition_value".equals(evaluate_value)) { // 根据已有值进行逻辑区分
    redisTemplate.opsForValue().set("key1", "value1"); // write
} else {
    redisTemplate.opsForValue().set("key1", "value2"); // write
}

个人认为更佳致命的设计在于既然不支持事务内读操作,那就明确报错(也不是很友好)禁用API,或者给其他的命令来解决,否则同一段代码其语意逻辑在不同场景表现不同,很容易引起Bug(甚至是事故)。

2.2 无法回滚

此处所说的回滚并不是指DISCARD命令,DISCARD命令只是丢弃整个队列,这里讲的是一旦提交EXEC命令,所有命令将FIFO的原则逐个执行,一但某个命令执行失败,并不会终止,而是继续往下执行,直到所有命令被执行完成。

当然,EXEC命令并非每次都是成功的,比如监测到WATCH的Keys有发生改变,则Exec执行失败。

2.3 Redis Cluster不支持事务

Redis官方说明集群模式不支持事务,甚至是不支持multi的所有操作,如MSET,MGET等,主要原因是key的分散(命令需要MOVE到其他节点)。但实际并不是绝对,只要Redis客户端允许(未禁用)命令(JedisCluster报错),执行MULTI和EXEC也是可以的,并不会每次都返回错误,当事务提交所有命令涉及的key都能在对应的服务节点上完成时,不会报错。

另外,Redis Cluster如果存在反向代理服务来暴露服务,同样是可以通过代理来完成事务,比如阿里云的Redis Cluster,兼容所有集群命令提交。

前面了解了Redis事务实现,接下来了解下RedisTemplate。

三、RedisTemplate简介

RedisTemplate是Spring Data Redis提供给用户的最高级的抽象Redis Client,抽象封装主要是三个方面:

RedisTemplate也考虑到Redis的一些高级用法(事务Session、Pipeline、暴露Connection),故还是保留一些可重载的方法(所有execute开通的方法),方便使用者使用。

RedisTemplate作为Client抽象层,默认支持Jedis和Lettuce Client,当然也可以自行实现支持Redisson(需实现RedisClientProvider等相关工作)。下面具体看下各个客户端的特点,有助于侧面理解RedisTemplate封装连接管理的逻辑。

客户端 特点
Jedis 官方版本,Redis的操作特性支持全面
使用阻塞的I/O,且其方法调用都是同步的,程序流需要等到sockets处理完I/O才能执行,不支持异步
Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis
Lettuce Redis高级客户端,较好支持分布应用使用场景
Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作
Redisson 重点关注Redis分布应用使用场景:分布式锁,分布式集合,可通过Redis支持延迟队列
基于Netty框架的事件驱动的通信层,其方法调用是异步的
Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作

可以看出,客户端分两类,主要在于连接实现是否采用Non-blocking I/O,基于NIO的实现方式,连接本身可以并发复用(Jedis基于Blocking I/O,一个连接同一时间仅给一个线程使用,归还连接池后才可再分配线程使用,而Lettuce和Redisson基于Non-blocking I/O,多个线程可以往同一个fd_socket上写),也就是说线程安全的,但是Jedis却正好相反,可以推断,RedisTemplate需要兼容这些特性。

连接管理封装方向:

PS:文章写到这里,敏感的同学应该可以想到,Redis事务状态存储在服务端,而客户端连接又是可以并发复用的,这自然会遇到一个棘手问题,如何保证事务状态不会串,一但事务状态错乱,将出现以下问题:

四、RedisTemplate事务使用

前面简单了解了Redis事务实现以及RedisTemplate封装,接下来看看RedisTemplate是如何正确使用事务。

4.1 通过SessionCallback简单使用

使用SessionCallback来包装所有事务命令,如下代码

public <T> T execute(SessionCallback<T> session) {
    Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(session, "Callback object must not be null");
    RedisConnectionFactory factory = this.getRequiredConnectionFactory();
    RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);

    Object var3;
    try {
        var3 = session.execute(this);
    } finally {
        RedisConnectionUtils.unbindConnection(factory);
    }

    return var3;

}

通过前面一些预备知识,可以理解到,RedisTemplate execute SessionCallback实际就是先做了连接绑定,我们可以去看看源码:

public <T> T execute(SessionCallback<T> session) {
    Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(session, "Callback object must not be null");
    RedisConnectionFactory factory = this.getRequiredConnectionFactory();
    RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
    Object var3;
    try {
        var3 = session.execute(this);
    } finally {
        RedisConnectionUtils.unbindConnection(factory);
    }
    return var3;
}

反之,如果不使用SessionCallback来包装所有命令,会遇到什么后果(可以根据前面学习知识去思考,事务状态会出现错乱),如下代码:

redisTemplate.multi();
redisTemplate.opsForValue().set("key1", "value1"); // cmd1
redisTemplate.opsForValue().set("key2", "value2"); // cmd2
Object result = redisTemplate.exec();

实际上面这段代码运行可能不会遇到你想的结果,比如编写Demo的测试并不会遇到复杂的多线程并发场景,另外也不见得你特意使用了Lettuce并配置了连接池(Lettuce可以单连接模式,早期版本仅支持单连接,默认是单连接),在使用Jedis下客户端情况下,也不会遇到此问题,上下文的连接实际是同一个,绑不绑定都不重要。

特别注意:虽然使用 SessionCallback 来包装执行事务命令,但是请确保事务能结束掉,连接池不保证每次用完连接一定会 colse。

// 事务执行;正常使用
Object result = redisTemplate.execute(new SessionCallback<Object>() {
    @Override    
    public Object execute(RedisOperations redisOperations) throws DataAccessException {
        redisOperations.multi();
        try {
            redisOperations.opsForValue().set("key1", "value1"); // cmd1
            redisOperations.opsForValue().set("key2", "value2"); // cmd2
            return redisOperations.exec();
        } catch (Exception e) {
            redisOperations.discard(); // 异常丢弃命令
            return null;
        }
    }
});

4.2 开启EnableTransactionSupport使用

从RedisTemplate源码看出,有一个enableTransactionSupport特别显眼,从字面意思看,是支持事务的意思,另外看下命令执行入口(execute RedisCallback方法)代码可以看出:

@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
    Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(action, "Callback object must not be null");

    RedisConnectionFactory factory = this.getRequiredConnectionFactory();
    RedisConnection conn = null;
    Object var11;
    try {
        if (this.enableTransactionSupport) {
            conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
        } else {
            conn = RedisConnectionUtils.getConnection(factory);
        }

        boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
        RedisConnection connToUse = this.preProcessConnection(conn, existingConnection);
        boolean pipelineStatus = connToUse.isPipelined();
        if (pipeline && !pipelineStatus) {
            connToUse.openPipeline();
        }

        RedisConnection connToExpose = exposeConnection ? connToUse : this.createRedisConnectionProxy(connToUse);
        T result = action.doInRedis(connToExpose);
        if (pipeline && !pipelineStatus) {
            connToUse.closePipeline();
        }

        var11 = this.postProcessResult(result, connToUse, existingConnection);
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory, this.enableTransactionSupport);
    }

    return var11;
}

如果enableTransactionSupport开启(代码第10行),在上下文中存在事务(特指spring-tx封装的一套上下文事务,常见注解事务:@Transactional)同样会绑定连接,所以如果整个RedisTemplate开启了enableTransactionSupport状态,以下代码则是正确写法,无论是否使用Lettuce连接池模式:

// 函数已注解申明事务 @Transactional
redisTemplate.opsForValue().set("key1", "value1"); // cmd1
redisTemplate.opsForValue().set("key2", "value2"); // cmd2

分析以上代码仍然从连接绑定(RedisConnectionUtils.bindConnection)入手,具体细节不再贴出来,找到RedisConnectionUtils.doGetConnection方法,因为开启enableTransactionSupport,找到RedisConnectionUtils.potentiallyRegisterTransactionSynchronisation即可看出,在上下文存在写事务时,自动开启multi。当时上下文事务提交时,再执行exec或者discard,具体实现代码RedisConnectionUtils.RedisTransactionSynchronizer内部类中。

4.3 EnableTransactionSupport的影响

RedisTemplate开启enableTransactionSupport后,看似事务使用变得轻巧,实际则不然,有以下诟病:

前面两点应该好理解,我就着重解释下第三点什么意思,读写混合,如下代码:

public void mark(String referValue) {
        String value = redisTemplate.opsForValue().get("key"); // cmd1
        if (referValue.equals(value)) {
            redisTemplate.opsForValue().set("key", "value2"); // cmd2
        } else {
            redisTemplate.opsForValue().set("key", "value3"); // cmd3
        }
}

函数在无事务情况下,调用一直正常(逻辑正确),但某一天被其他同事在上下文事务中调用,结果就出问题了。遇到问题很难排除,一是老代码,老逻辑,甚至是二方三方Jar代码,毋庸置疑;二是事务并不是专门为了Redis事务而开启,往往是因为JDBC事务需要(写完DB要操作Redis,这是一个常用场景)。

甚至还有一种场景,因为你配置redisTemplate默认的Bean为enableTransactionSupport后,发现很多二方三方Jar代码都出现逻辑运行错误。这些例子都可以说明,开启enableTransactionSupport的维护成本远远大于他的使用价值。

五、Redis事务使用忠告

本文虽讲解了RedisTemplate事务使用,但是并不推荐大家使用Redis事务,Redis事务实现本身比较简单,甚至说非真正意义上的事务;另外RedisTemplate虽然对redis命令进行封装,但并没有严格约束命令的使用创景,如get命令在multi(pipeline一样)开启时无返回值也不禁用,很容易引发深层次问题,很难排查(读写混合逻辑)。

单纯的Redis事务一般确实没有太多使用场景,如果只考虑原子性完全可以使用lua脚本(实际命令在服务端执行是单线程的),我了解到逼迫我们考虑Redis事务或者说关注enableTransactionSupport,是因JDBC事务中Redis命令提前提交问题,尽管我们总是在DB执行后再执行Redis命令,如下代码

@Transactional
public void update(String name, Intger age) {
    Person person = reposity.getByName(name);
    if (person != null) {
        person.setAge(age);

        // save to db
        reposity.update(person);

        // update cache
        redisTemplate.opsForHash().put(name, "age", age); 

        // send notice
        producer.send(new ProducerRecord<>("PersonChanged", null, name));
    }
}

当消息发送失败,导致事务回滚,实际redis命令早已执行,无法回滚;就算消息发送并没有失败,redis put命令也早于db的update执行,其中导致的后果则是redis与db不一致。

如果要解决以上场景问题,开始 enableTransactionSupport 自然是能解决问题,但是并非最佳手段,如 redis 的缓存异步提交,甚至监听 binlog 再提交,都是不错的选择。如果觉得这样做成本略大,当然也可以自行封装 RedisTemplate,将所有 JDBC 事务中的 Redis 命令收集到 Queue 中,等到事务提交后再执行。

六、Redis结合JDBC事务实现

新增CustomRedisTemplate类,继承spring-data中的RedisTemplate类,重写execute方法。

// 省略部分逻辑
public class CustomRedisTemplate<K, V> extends RedisTemplate<K, V> {


    private  <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline, RedisCallContext context, boolean immediately) {

        // 开启原生事务支持(spring-data)直接执行
        if (enableOriginTransactionSupport) {
            return superExecute(action, exposeConnection, pipeline, cmd, key);
        }

        // 事务完的操作,不用加入到队列 直接执行
        if (isTransactionCompletion()) {
            return superExecute(action, exposeConnection, pipeline, cmd, key);
        }

        // 非即时性的,且写事务中 pending 不立即执行,写入队列中
        if (!immediately && isActualNonReadonlyTransactionActive()) {
            if (context != null) {
                log.debug("Pending execute redis {} {} in a transaction", context.cmd, context.key);
            }
            RedisCallbackItem item = new RedisCallbackItem(action, exposeConnection, pipeline);
            if (context != null) {
                item.description = "" + context.cmd  + " " + context.key;
            }
            RedisTransactionSupportSynchronization synchronization = getTransactionSupportSynchronization();
            // 写入队列中
            synchronization.items.add(item);
            return null;
        }

        return superExecute(action, exposeConnection, pipeline, cmd, key);
    }
}

execute的核心方法为判断当前命令是否需要立即执行,并且是否在事务当中,如果不满足,则调用父类RedisTemplate.execute方法执行;如果满足条件,则写入items队列中,等待执行。

新增RedisTransactionSupportSynchronization类,继承TransactionSynchronizationAdapter类。TransactionSynchronizationAdapter是spring-tx中提供的接口适配器,预留给大家实现事务的扩展使用。


private static class RedisTransactionSupportSynchronization extends TransactionSynchronizationAdapter {

    public RedisTransactionSupportSynchronization(CustomRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    private final CustomRedisTemplate redisTemplate;
    private final List<RedisCallbackItem> items = new ArrayList<>();

// 实现afterCompletion方法,用于JDBC事务完成后执行Redis命令
    @Override
    public void afterCompletion(int status) {
        process(status);
    }

    private void process(int status) {
        // 0:提交 ; 1,2:回滚和终端
        if (status == 0) {
            items.forEach((item) -> {
                // 实际实行Redis命令
                Object result = redisTemplate.execute(item.action,item.exposeConnection,item.pipeline, null, true);
                log.debug("Execute redis {} result:{} after transaction commit.", item.description, result);
            });
        } else {
            items.forEach((item) -> {
                log.warn("Discard redis {} after transaction rollback.", item.description);
            });
        }
    }
}

实现afterCompletion方法,在JDBC事务完成后拿到上面放置items队列中的Redis执行命令,开始执行Redis操作,保证Redis的执行,在JDBC事务之后。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8