前言
Redis也是支持事务命令的,一般事务支持A(原子性)、I(隔离性)、D(持久性)C、(一致性),Mysql实现事务得解决各种问题,实现上也有一定难度,对于Redis来说需要通过源码来看一下事务到底支持到什么程度,有什么特点?
正文
MULTI
get key
set key 1
EXEC
复制代码
Redis的事务使用MULTI当作开始,以后输入命令不会执行,最终输入EXEC来执行所有命令。
typedef struct multiCmd {
robj **argv;
int argc;
struct redisCommand *cmd;
} multiCmd;
typedef struct multiState {
multiCmd *commands; /* 命令数组 */
int count; /* 命令数量*/
int cmd_flags; /* 命令flag累计 */
int cmd_inv_flags;
} multiState;
复制代码
Redis创建了一个multiState的结构体来保存命令数组以及其他信息
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}
复制代码
当我们首次输入multi命令的时候,只会将客户端设置会MULTI模式,multi不能嵌套。而当我们输入其他命令时,会经过前面文章解析过的processCommand方法
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
复制代码
判断到client处于multi模式中,会将命令放入queue中,也就是multiState中的数组里面
void queueMultiCommand(client *c) {
multiCmd *mc;
int j;
if (c->flags & CLIENT_DIRTY_EXEC)
return;
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}
复制代码
重新分配内存存放当前命令
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
int was_master = server.masterhost == NULL;
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
//检查是否要打断事物 watchkey被改动 | 之前的命令存在错误
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullarray[c->resp]);
discardTransaction(c);
goto handle_monitor;
}
//unwatch所有key 因为事物是原子的 不用担心过程中被修改
unwatchAllKeys(c);
复制代码
exec执行时候会首先判断是否处于multi中,然后还要判断被watch的key是否被修改,关于watch接下来会分析源码。
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyArrayLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
if (!must_propagate &&
!server.loading &&
!(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)))
{
execCommandPropagateMulti(c);
must_propagate = 1;
}
int acl_keypos;
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
addReplyErrorFormat(c,
"-NOPERM ACLs rules changed between the moment the "
"transaction was accumulated and the EXEC call. "
"This command is no longer allowed for the "
"following reason: %s",
(acl_retval == ACL_DENIED_CMD) ?
"no permission to execute the command or subcommand" :
"no permission to touch the specified keys");
} else {
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
}
//进行重新赋值 不然有些随机命令会导致不一致
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
//清理事务
discardTransaction(c);
复制代码
可以看到使用了orig_argv、orig_argc和orig_cmd来备份原始命令,因为在主从模式下,为了确保修改的数据一致,在执行随机命令的时候会修改命令的原文,所以需要备份;处理acl后调用call来执行命令。
从这里可以看出来,redis的事务不支持原子性的,从这个循环没有对命令进行失败处理,命令是可以部分成功部分失败的。
在exec中有unwatchAllKeys的代码。watch相当于是乐观锁,在执行multi之前可以用watch命令去对key加锁来确保exec执行的时候该key不会被修改,如果该key被其他客户端修改了,那么该事务会中断。
void watchCommand(client *c) {
int j;
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
复制代码
事务开始后是无法watch的,调用watchForKey方法
typedef struct watchedKey {
robj *key;
redisDb *db;
} watchedKey;
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
/* Add the new key to the list of keys watched by this client */
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
复制代码
redisDb里面用了一个dict来存放watched_keys的client,首先会将cilent添加到dict里面,然后添加到watched_keys的list里面
还记得set方法会调用genericSetKey吗?里面有这样一句
if (signal) signalModifiedKey(c,db,key);
void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key);
}
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags |= CLIENT_DIRTY_CAS;
}
}
复制代码
signalModifiedKey会调用touchWatchedKey去修改客户端的flag,设置为CLIENT_DIRTY_CAS,在执行exec判断到CLIENT_DIRTY_CAS会中断执行。
总结
redis的事务并不是完整执行AIDC。通过源码分析,事务只支持了隔离性,因为客户端之间是隔离的。而一致性和原子性都不支持,也只能通过watch命令来模拟CAS。最后持久性也不一定支持,因为redis的RDB或者AOF都不是实时执行的,还是有丢失的风险。