前言
前面看了redis几个常用的数据结构源码,这篇文章从几个常用命令的角度出发,通过命令的执行流程来看一下redis在执行命令的过程中做了什么。
正文
redisObject和redisDB
redisObject
在介绍其他内容之前,先看一redisObject,Redis构造了对象系统,可以根据对象的不同来确定执行不同的命令,还可以优化对象提高使用效率
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS;
int refcount;
void *ptr;
} robj;
复制代码
参数含义:
- type:类型由以下5种来表示,对应了五个基础类型
#define OBJ_STRING 0 /* 字符串对象. */
#define OBJ_LIST 1 /* List对象. */
#define OBJ_SET 2 /* Set对象. */
#define OBJ_ZSET 3 /* zset对象. */
#define OBJ_HASH 4 /* Hash对象 */
复制代码
- encoding:记录了对象所使用的编码
#define OBJ_ENCODING_RAW 0 /* 简单动态字符串 sds */
#define OBJ_ENCODING_INT 1 /* long类型的整数*/
#define OBJ_ENCODING_HT 2 /* hashtable */
#define OBJ_ENCODING_ZIPLIST 5 /* 压缩链表 */
#define OBJ_ENCODING_INTSET 6 /* 整数集合 */
#define OBJ_ENCODING_SKIPLIST 7 /* 跳表 */
#define OBJ_ENCODING_EMBSTR 8 /* embstr编码的sds */
#define OBJ_ENCODING_QUICKLIST 9 /* quicklist */
#define OBJ_ENCODING_STREAM 10 /* 基数树 */
复制代码
- lru:记录了对象的访问时间 用于lru算法
- refcount:记录了对象的引用计数 >1表示是共享对象
- ptr:实际指针
RedisObject被使用在各个地方,例如set key val命令,在源码中key和val都是用redisObject来表示的,具体细节看后文解析。
redisBD
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
struct redisServer{
redisDb *db;
}
复制代码
可以看到在redis中redisDb由数组组成,使用的时候可以选择db,redisDb就是几个字典,将过期key和watchedKey维护在另外的字典里面,如果一个key配置了过期时间,会同时存在dict和expires字典中。
命令
struct redisCommand {
//名称
char *name;
//具体执行函数
redisCommandProc *proc;
//参数校验
int arity;
//描述
char *sflags;
uint64_t flags;
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
int id;
};
复制代码
以上是方法结构体
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,
"admin no-script",
0,NULL,0,0,0,0,0,0},
{"get",getCommand,2,
"read-only fast @string",
0,NULL,1,1,1,0,0,0},
/* Note that we can't flag set as fast, since it may perform an
* implicit DEL of a large key. */
{"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
{"setnx",setnxCommand,3,
"write use-memory fast @string",
0,NULL,1,1,1,0,0,0},
{"setex",setexCommand,4,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
复制代码
这是内置方法的数组,可以看到get key对应的函数是getCommand,参数指定为2。
initServerConfig {
....
server.commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
....
}
void populateCommandTable(void) {
dictAdd(server.commands, sdsnew(c->name), c);
}
复制代码
服务配置初始化的时候会创建一个字典,将name对应方法结构体加入字典中,形成了一个方法的hashMap
具体执行命令是processCommand的方法,该方法有点长
int processCommand(client *c) {
//模块的命令的过滤器
moduleCallCommandFilters(c);
//quit命令单独处理
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
//通过server.commands来查找命令对应的方法
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
//找不到命令
if (!c->cmd) {
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
//检验命令的合法性
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
//是否需要认证
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
(DefaultUser->flags & USER_FLAG_DISABLED)) &&
!c->authenticated;
if (auth_required) {
/* AUTH and HELLO and no auth modules are valid even in
* non-authenticated state. */
if (!(c->cmd->flags & CMD_NO_AUTH)) {
rejectCommand(c,shared.noautherr);
return C_OK;
}
}
//拒绝不通过acl的命令
int acl_keypos;
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
if (acl_retval == ACL_DENIED_CMD)
rejectCommandFormat(c,
"-NOPERM this user has no permissions to run "
"the '%s' command or its subcommand", c->cmd->name);
else
rejectCommandFormat(c,
"-NOPERM this user has no permissions to access "
"one of the keys used as arguments");
return C_OK;
}
//集群处理
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
//处理maxmemory,lru有关在内部
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
/* It was impossible to free enough memory, and the command the client
* is trying to execute is denied during OOM conditions or the client
* is in MULTI/EXEC context? Error. */
if (out_of_memory &&
(is_denyoom_command ||
(c->flags & CLIENT_MULTI &&
c->cmd->proc != discardCommand)))
{
rejectCommand(c, shared.oomerr);
return C_OK;
}
/* Save out_of_memory result at script start, otherwise if we check OOM
* untill first write within script, memory used by lua stack and
* arguments might interfere. */
if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
server.lua_oom = out_of_memory;
}
}
/* Make sure to use a reasonable amount of memory for client side
* caching metadata. */
if (server.tracking_clients) trackingLimitUsedSlots();
//磁盘问题 拒绝命令
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(is_write_command ||c->cmd->proc == pingCommand))
{
if (deny_write_type == DISK_ERROR_TYPE_RDB)
rejectCommand(c, shared.bgsaveerr);
else
rejectCommandFormat(c,
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno));
return C_OK;
}
//如果没有足够的slave节点准备好 拒绝命令
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
is_write_command &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
//如果是只读节点 且不是主节点发送的命令 拒绝
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
return C_OK;
}
//发布订阅模式不接受其他命令
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
c->cmd->name);
return C_OK;
}
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
is_denystale_command)
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
}
//加载中拒绝命令
if (server.loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
/* Lua script too slow? Only allow a limited number of commands.
* Note that we need to allow the transactions commands, otherwise clients
* sending a transaction with pipelining without error checking, may have
* the MULTI plus a few initial commands refused, then the timeout
* condition resolves, and the bottom-half of the transaction gets
* executed, see Github PR #7022. */
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != helloCommand &&
c->cmd->proc != replconfCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
rejectCommand(c, shared.slowscripterr);
return C_OK;
}
//如果执行过multi 表示处于事务模式 将后续命令加入到事务queue
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
//具体执行代码方法
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
复制代码
processCommand做了什么?
- quit命令处理,直接退出然后标记client
- 校验命令的合法性,arity参数为正数时:表示命令必须等于这个长度;为负数的时候:表示命令长度必须大于等于正数的绝对值。比如set命令arity=-3,表示set命令的长度必须大于等于3
- 检查认证和acl
- 集群情况的处理
- maxmemory情况处理
- 磁盘问题处理
- 主从模式下从节点数量问题太少、只读节点处理
- 发布订阅模式处理
- 事务处理
- 最终调用call执行代码
void call(client *c, int flags) {
long long dirty;
ustime_t start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
server.fixed_time_expire++;
/* Send the command to clients in MONITOR mode if applicable.
* Administrative commands are considered too dangerous to be shown. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);
/* Call the command. */
dirty = server.dirty;
updateCachedTime(0);
//执行命令 记录时间
start = server.ustime;
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
复制代码
这是call代码的前半段,后半段以后再看,这里最终调用了方法执行具体业务命令,并且计算了执行时间
set命令
跟踪一下set命令的代码,看一下set一个值的过程中到底做了什么?
{"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
复制代码
根据table定义到set方法内部
void setCommand(client *c) {
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_SET_NO_FLAGS;
//循环解析参数
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
//解析nx 只在键不存在时,才对键进行设置操作
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_XX))
{
flags |= OBJ_SET_NX;
//解析XX 只在键已经存在时,才对键进行设置操作
} else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_NX))
{
flags |= OBJ_SET_XX;
//保留设置前key的生存时间
} else if (!strcasecmp(c->argv[j]->ptr,"KEEPTTL") &&
!(flags & OBJ_SET_EX) && !(flags & OBJ_SET_PX))
{
flags |= OBJ_SET_KEEPTTL;
//解析EX 秒为单位
} else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_KEEPTTL) &&
!(flags & OBJ_SET_PX) && next)
{
flags |= OBJ_SET_EX;
unit = UNIT_SECONDS;
expire = next;
j++;
//解析PX 毫秒单位
} else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_KEEPTTL) &&
!(flags & OBJ_SET_EX) && next)
{
flags |= OBJ_SET_PX;
unit = UNIT_MILLISECONDS;
expire = next;
j++;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
//对字符串对象编码优化
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
复制代码
前面一部分是解析set方法参数,然后设置flags
- nx 只在键不存在的时候才进行set
- xx 只在键存在的时候才进行set
- KEEPTTL 保留之前设置过的过期时间
- EX 秒为单位的过期时间
- PX 毫秒为单位的过期时间
解析了上面的参数过后,会进入一个重要的过程:tryObjectEncoding
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
size_t len;
//确保type是字符串类型
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
//确保字符串对象的编码类型为RAW或EMBSTR
if (!sdsEncodedObject(o)) return o;
//refcount大于1为共享对象 不对共享对象进行编码
if (o->refcount > 1) return o;
len = sdslen(s);
//字符串长度<=20 表示在long long范围 可以long编码
if (len <= 20 && string2l(s,len,&value)) {
//value范围在[0-9999) 表示可以使用共享整数对象
if ((server.maxmemory == 0 ||
!(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
value >= 0 &&
value < OBJ_SHARED_INTEGERS)
{
//将自身refCount减少 等于1的时候释放
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];
} else {
//如果以前是raw格式 修改encoding和指针
if (o->encoding == OBJ_ENCODING_RAW) {
sdsfree(o->ptr);
o->encoding = OBJ_ENCODING_INT;
o->ptr = (void*) value;
return o;
} else if (o->encoding == OBJ_ENCODING_EMBSTR) {
decrRefCount(o);
return createStringObjectFromLongLongForValue(value);
}
}
}
//字符串大于20小于44 使用emb编码
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
robj *emb;
if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
//使用sds字符串 减少原对象的refcount
emb = createEmbeddedStringObject(s,sdslen(s));
decrRefCount(o);
return emb;
}
//无法进行编码 优化sds使用空间
trimStringObjectIfNeeded(o);
return o;
}
复制代码
tryObjectEncoding对字符串进行编码用于优化空间,提一点embstr一次性分配64位内存空间,减去redisObject和sds对象头,再减去结尾的’/0’,一共还剩下44,这一点可以根据之前的文章算一下。
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
if (expire) {
//通过expire对象获取milliseconds 失败返回
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
//对xx和nx参数的判断
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}
复制代码
真实的set方法,主要看一下genericSetKey,代码里面的lookupKeyWrite也包含在内
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
incrRefCount(val);
if (!keepttl) removeExpire(db,key);
if (signal) signalModifiedKey(c,db,key);
}
复制代码
根据lookupKeyWrite查询的结果来判断是新增key还是修改值,并且增加refcount的值
robj *lookupKeyWrite(redisDb *db, robj *key) {
return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
return lookupKey(db,key,flags);
}
int expireIfNeeded(redisDb *db, robj *key) {
//key未过期
if (!keyIsExpired(db,key)) return 0;
//如果是从库只返回装备 不涉及到删除key
if (server.masterhost != NULL) return 1;
server.stat_expiredkeys++;
//命令传播
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
//延迟删除还是直接删除
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
}
复制代码
lookupKeyWriteWithFlags会执行expireIfNeeded逻辑,expireIfNeeded部分逻辑如上注释,这段代码主要是处理过期key的问题,过期策略以后再讲
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
return val;
} else {
return NULL;
}
}
复制代码
这里通过dictFind获取到以前的对象,如果有一个子进程就不进行lru/lfu的跟新,由于LRU和LFU的算法不一样,所以跟新策略不一样,这里放到后文讲。
genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
复制代码
set执行后会增加server.dirty,表示上次保存前数据新增了一条,如果有过期时间会调用setExpire来设置过期时间,后续会触发通知。
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
复制代码
从这里可以发现expires字典里面和dict字典指向的是同一个对象。
总结:set方法抛开对nx等参数的处理,主要逻辑是对redisBd的dict进行设置,在设置key的过程中会验证原key是否过期,并且跟新lru/lfu信息
get命令
void getCommand(client *c) {
getGenericCommand(c);
}
int getGenericCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
return C_OK;
if (o->type != OBJ_STRING) {
addReply(c,shared.wrongtypeerr);
return C_ERR;
} else {
addReplyBulk(c,o);
return C_OK;
}
}
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) addReply(c,reply);
return o;
}
robj *lookupKeyRead(redisDb *db, robj *key) {
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}
复制代码
主要调用lookupKeyReadOrReply,继续调用lookupKeyRead——>lookupKeyReadWithFlags
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) {
if (server.masterhost == NULL) {
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}
}
val = lookupKey(db,key,flags);
if (val == NULL) {
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
}
else
server.stat_keyspace_hits++;
return val;
}
复制代码
expireIfNeeded的出现和set方法一样,都是判断key过期的行为,这里对主从进行了判断。
因为如果设置了主从的读写分离,从库是无法删除过期key的,只能被动等待主库的命令,所以当查询到key过期的时候,只有主库和设置只读的从库能够安全的返回NULL,后续还是得调用lookupKey去dict里面查询数据,并且跟新lru,和set是一致的。
monitor命令
monitor命令是将当前客户端变成一个监视器,能够看到redis服务器接下来执行的所有命令
void monitorCommand(client *c) {
/* ignore MONITOR if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;
c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
listAddNodeTail(server.monitors,c);
addReply(c,shared.ok);
}
复制代码
monitors是一个list,用于存放客户端,同时设置flags。
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
复制代码
在call方法中有这一行代码,执行具体方法之前会判断是否有客户端订阅,然后调用replicationFeedMonitors。从这里可以看出来,monitor命令是不管服务器有没有执行成功,都会先发送给客户端
listRewind(monitors,&li);
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
}
复制代码
replicationFeedMonitors核心就是遍历list,依次发给客户端命令。
总结
从上诉源码来看,get和set命令中可以了解到以下几点:
- redis主要是由一个hashmap组成,过期key保存在expiresDict中
- set和get命令过程中都会查询key是否过期,针对主从节点的不同执行不同的策略。
- redis在set和get命令过程中都跟新了key的lru信息,但并不像传统的lru算法一样维护一个fifo队列
- 执行命令的过程中会处理各种情况:monitor、主从、事务等。