Redis6.0.6源码阅读——事务

前言

Redis也是支持事务命令的,一般事务支持A(原子性)、I(隔离性)、D(持久性)C、(一致性),Mysql实现事务得解决各种问题,实现上也有一定难度,对于Redis来说需要通过源码来看一下事务到底支持到什么程度,有什么特点?

正文

MULTI
get key
set key 1
EXEC
复制代码

Redis的事务使用MULTI当作开始,以后输入命令不会执行,最终输入EXEC来执行所有命令。

typedef struct multiCmd {
    robj **argv;
    int argc;
    struct redisCommand *cmd;
} multiCmd;

typedef struct multiState {
    multiCmd *commands;     /* 命令数组 */
    int count;              /* 命令数量*/
    int cmd_flags;          /* 命令flag累计 */
    int cmd_inv_flags;
} multiState;
复制代码

Redis创建了一个multiState的结构体来保存命令数组以及其他信息

void multiCommand(client *c) {
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    c->flags |= CLIENT_MULTI;
    addReply(c,shared.ok);
}
复制代码

当我们首次输入multi命令的时候,只会将客户端设置会MULTI模式,multi不能嵌套。而当我们输入其他命令时,会经过前面文章解析过的processCommand方法

if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
复制代码

判断到client处于multi模式中,会将命令放入queue中,也就是multiState中的数组里面

void queueMultiCommand(client *c) {
    multiCmd *mc;
    int j;
    
    if (c->flags & CLIENT_DIRTY_EXEC)
        return;

    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
    c->mstate.cmd_flags |= c->cmd->flags;
    c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}
复制代码

重新分配内存存放当前命令

void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
    int was_master = server.masterhost == NULL;

    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    //检查是否要打断事物 watchkey被改动 | 之前的命令存在错误
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                   shared.nullarray[c->resp]);
        discardTransaction(c);
        goto handle_monitor;
    }

    //unwatch所有key 因为事物是原子的 不用担心过程中被修改
    unwatchAllKeys(c);
复制代码

exec执行时候会首先判断是否处于multi中,然后还要判断被watch的key是否被修改,关于watch接下来会分析源码。

    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyArrayLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        if (!must_propagate &&
            !server.loading &&
            !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)))
        {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }

        int acl_keypos;
        int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
        if (acl_retval != ACL_OK) {
            addACLLogEntry(c,acl_retval,acl_keypos,NULL);
            addReplyErrorFormat(c,
                "-NOPERM ACLs rules changed between the moment the "
                "transaction was accumulated and the EXEC call. "
                "This command is no longer allowed for the "
                "following reason: %s",
                (acl_retval == ACL_DENIED_CMD) ?
                "no permission to execute the command or subcommand" :
                "no permission to touch the specified keys");
        } else {
            call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
        }

        //进行重新赋值 不然有些随机命令会导致不一致
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    //清理事务
    discardTransaction(c);
复制代码

可以看到使用了orig_argv、orig_argc和orig_cmd来备份原始命令,因为在主从模式下,为了确保修改的数据一致,在执行随机命令的时候会修改命令的原文,所以需要备份;处理acl后调用call来执行命令。

从这里可以看出来,redis的事务不支持原子性的,从这个循环没有对命令进行失败处理,命令是可以部分成功部分失败的。


在exec中有unwatchAllKeys的代码。watch相当于是乐观锁,在执行multi之前可以用watch命令去对key加锁来确保exec执行的时候该key不会被修改,如果该key被其他客户端修改了,那么该事务会中断。

void watchCommand(client *c) {
    int j;

    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}
复制代码

事务开始后是无法watch的,调用watchForKey方法

typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;




clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    /* Add the new key to the list of keys watched by this client */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
复制代码

redisDb里面用了一个dict来存放watched_keys的client,首先会将cilent添加到dict里面,然后添加到watched_keys的list里面

还记得set方法会调用genericSetKey吗?里面有这样一句

if (signal) signalModifiedKey(c,db,key);

void signalModifiedKey(client *c, redisDb *db, robj *key) {
    touchWatchedKey(db,key);
    trackingInvalidateKey(c,key);
}

void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        c->flags |= CLIENT_DIRTY_CAS;
    }
}
复制代码

signalModifiedKey会调用touchWatchedKey去修改客户端的flag,设置为CLIENT_DIRTY_CAS,在执行exec判断到CLIENT_DIRTY_CAS会中断执行。

总结

redis的事务并不是完整执行AIDC。通过源码分析,事务只支持了隔离性,因为客户端之间是隔离的。而一致性和原子性都不支持,也只能通过watch命令来模拟CAS。最后持久性也不一定支持,因为redis的RDB或者AOF都不是实时执行的,还是有丢失的风险。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享