Redis6.0.6源码阅读——对象、命令

前言

前面看了redis几个常用的数据结构源码,这篇文章从几个常用命令的角度出发,通过命令的执行流程来看一下redis在执行命令的过程中做了什么。

正文

redisObject和redisDB

redisObject

在介绍其他内容之前,先看一redisObject,Redis构造了对象系统,可以根据对象的不同来确定执行不同的命令,还可以优化对象提高使用效率

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; 
    int refcount;
    void *ptr;
} robj;
复制代码

参数含义:

  • type:类型由以下5种来表示,对应了五个基础类型
#define OBJ_STRING 0    /* 字符串对象. */
#define OBJ_LIST 1      /* List对象. */
#define OBJ_SET 2       /* Set对象. */
#define OBJ_ZSET 3      /* zset对象. */
#define OBJ_HASH 4      /* Hash对象 */
复制代码
  • encoding:记录了对象所使用的编码
#define OBJ_ENCODING_RAW 0     /* 简单动态字符串 sds */
#define OBJ_ENCODING_INT 1     /* long类型的整数*/
#define OBJ_ENCODING_HT 2      /* hashtable */
#define OBJ_ENCODING_ZIPLIST 5 /* 压缩链表 */
#define OBJ_ENCODING_INTSET 6  /* 整数集合 */
#define OBJ_ENCODING_SKIPLIST 7  /* 跳表 */
#define OBJ_ENCODING_EMBSTR 8  /* embstr编码的sds */
#define OBJ_ENCODING_QUICKLIST 9 /* quicklist */
#define OBJ_ENCODING_STREAM 10 /* 基数树 */
复制代码
  • lru:记录了对象的访问时间 用于lru算法
  • refcount:记录了对象的引用计数 >1表示是共享对象
  • ptr:实际指针

RedisObject被使用在各个地方,例如set key val命令,在源码中key和val都是用redisObject来表示的,具体细节看后文解析。

redisBD

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

struct redisServer{
	redisDb *db;
}
复制代码

可以看到在redis中redisDb由数组组成,使用的时候可以选择db,redisDb就是几个字典,将过期key和watchedKey维护在另外的字典里面,如果一个key配置了过期时间,会同时存在dict和expires字典中。

命令

struct redisCommand {
    //名称
    char *name;
    //具体执行函数
    redisCommandProc *proc;
    //参数校验 
    int arity;
    //描述
    char *sflags;
    uint64_t flags;
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;
    int id; 
};
复制代码

以上是方法结构体

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},

    /* Note that we can't flag set as fast, since it may perform an
     * implicit DEL of a large key. */
    {"set",setCommand,-3,
     "write use-memory @string",
     0,NULL,1,1,1,0,0,0},

    {"setnx",setnxCommand,3,
     "write use-memory fast @string",
     0,NULL,1,1,1,0,0,0},

    {"setex",setexCommand,4,
     "write use-memory @string",
     0,NULL,1,1,1,0,0,0},

复制代码

这是内置方法的数组,可以看到get key对应的函数是getCommand,参数指定为2。

initServerConfig {
....
	server.commands = dictCreate(&commandTableDictType,NULL);
	populateCommandTable();
....
}

void populateCommandTable(void) {
	dictAdd(server.commands, sdsnew(c->name), c);
}
复制代码

服务配置初始化的时候会创建一个字典,将name对应方法结构体加入字典中,形成了一个方法的hashMap

具体执行命令是processCommand的方法,该方法有点长

int processCommand(client *c) {
    //模块的命令的过滤器
    moduleCallCommandFilters(c);

    //quit命令单独处理
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    //通过server.commands来查找命令对应的方法
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    //找不到命令
    if (!c->cmd) {
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    //检验命令的合法性
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        rejectCommandFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }

    int is_write_command = (c->cmd->flags & CMD_WRITE) ||
                           (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
    int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
                             (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
    int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
                               (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
    int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
                                 (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));

    //是否需要认证
    int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
                          (DefaultUser->flags & USER_FLAG_DISABLED)) &&
                        !c->authenticated;
    if (auth_required) {
        /* AUTH and HELLO and no auth modules are valid even in
         * non-authenticated state. */
        if (!(c->cmd->flags & CMD_NO_AUTH)) {
            rejectCommand(c,shared.noautherr);
            return C_OK;
        }
    }

    //拒绝不通过acl的命令
    int acl_keypos;
    int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
    if (acl_retval != ACL_OK) {
        addACLLogEntry(c,acl_retval,acl_keypos,NULL);
        if (acl_retval == ACL_DENIED_CMD)
            rejectCommandFormat(c,
                "-NOPERM this user has no permissions to run "
                "the '%s' command or its subcommand", c->cmd->name);
        else
            rejectCommandFormat(c,
                "-NOPERM this user has no permissions to access "
                "one of the keys used as arguments");
        return C_OK;
    }

    //集群处理
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        }
    }

    //处理maxmemory,lru有关在内部
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
        /* freeMemoryIfNeeded may flush slave output buffers. This may result
         * into a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        /* It was impossible to free enough memory, and the command the client
         * is trying to execute is denied during OOM conditions or the client
         * is in MULTI/EXEC context? Error. */
        if (out_of_memory &&
            (is_denyoom_command ||
             (c->flags & CLIENT_MULTI &&
              c->cmd->proc != discardCommand)))
        {
            rejectCommand(c, shared.oomerr);
            return C_OK;
        }

        /* Save out_of_memory result at script start, otherwise if we check OOM
         * untill first write within script, memory used by lua stack and
         * arguments might interfere. */
        if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
            server.lua_oom = out_of_memory;
        }
    }

    /* Make sure to use a reasonable amount of memory for client side
     * caching metadata. */
    if (server.tracking_clients) trackingLimitUsedSlots();

    //磁盘问题 拒绝命令
    int deny_write_type = writeCommandsDeniedByDiskError();
    if (deny_write_type != DISK_ERROR_TYPE_NONE &&
        server.masterhost == NULL &&
        (is_write_command ||c->cmd->proc == pingCommand))
    {
        if (deny_write_type == DISK_ERROR_TYPE_RDB)
            rejectCommand(c, shared.bgsaveerr);
        else
            rejectCommandFormat(c,
                "-MISCONF Errors writing to the AOF file: %s\r\n",
                strerror(server.aof_last_write_errno));
        return C_OK;
    }

    //如果没有足够的slave节点准备好 拒绝命令
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        is_write_command &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
        rejectCommand(c, shared.noreplicaserr);
        return C_OK;
    }

    //如果是只读节点 且不是主节点发送的命令 拒绝
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        is_write_command)
    {
        rejectCommand(c, shared.roslaveerr);
        return C_OK;
    }

    //发布订阅模式不接受其他命令
    if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        rejectCommandFormat(c,
            "Can't execute '%s': only (P)SUBSCRIBE / "
            "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
            c->cmd->name);
        return C_OK;
    }

    /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
     * when slave-serve-stale-data is no and we are a slave with a broken
     * link with master. */
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        is_denystale_command)
    {
        rejectCommand(c, shared.masterdownerr);
        return C_OK;
    }

    //加载中拒绝命令
    if (server.loading && is_denyloading_command) {
        rejectCommand(c, shared.loadingerr);
        return C_OK;
    }

    /* Lua script too slow? Only allow a limited number of commands.
     * Note that we need to allow the transactions commands, otherwise clients
     * sending a transaction with pipelining without error checking, may have
     * the MULTI plus a few initial commands refused, then the timeout
     * condition resolves, and the bottom-half of the transaction gets
     * executed, see Github PR #7022. */
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != helloCommand &&
          c->cmd->proc != replconfCommand &&
          c->cmd->proc != multiCommand &&
          c->cmd->proc != discardCommand &&
          c->cmd->proc != watchCommand &&
          c->cmd->proc != unwatchCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
        rejectCommand(c, shared.slowscripterr);
        return C_OK;
    }

    //如果执行过multi 表示处于事务模式 将后续命令加入到事务queue
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        //具体执行代码方法
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}
复制代码

processCommand做了什么?

  • quit命令处理,直接退出然后标记client
  • 校验命令的合法性,arity参数为正数时:表示命令必须等于这个长度;为负数的时候:表示命令长度必须大于等于正数的绝对值。比如set命令arity=-3,表示set命令的长度必须大于等于3
  • 检查认证和acl
  • 集群情况的处理
  • maxmemory情况处理
  • 磁盘问题处理
  • 主从模式下从节点数量问题太少、只读节点处理
  • 发布订阅模式处理
  • 事务处理
  • 最终调用call执行代码
void call(client *c, int flags) {
    long long dirty;
    ustime_t start, duration;
    int client_old_flags = c->flags;
    struct redisCommand *real_cmd = c->cmd;

    server.fixed_time_expire++;

    /* Send the command to clients in MONITOR mode if applicable.
     * Administrative commands are considered too dangerous to be shown. */
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }

    /* Initialization: clear the flags that must be set by the command on
     * demand, and initialize the array for additional commands propagation. */
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    redisOpArray prev_also_propagate = server.also_propagate;
    redisOpArrayInit(&server.also_propagate);

    /* Call the command. */
    dirty = server.dirty;
    updateCachedTime(0);
    //执行命令 记录时间
    start = server.ustime;
    c->cmd->proc(c);
    duration = ustime()-start;
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;
复制代码

这是call代码的前半段,后半段以后再看,这里最终调用了方法执行具体业务命令,并且计算了执行时间

set命令

跟踪一下set命令的代码,看一下set一个值的过程中到底做了什么?

 {"set",setCommand,-3,
     "write use-memory @string",
     0,NULL,1,1,1,0,0,0},
复制代码

根据table定义到set方法内部

void setCommand(client *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;

    //循环解析参数
    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        //解析nx 只在键不存在时,才对键进行设置操作
        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {
            flags |= OBJ_SET_NX;
        //解析XX 只在键已经存在时,才对键进行设置操作
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {
            flags |= OBJ_SET_XX;
        //保留设置前key的生存时间
        } else if (!strcasecmp(c->argv[j]->ptr,"KEEPTTL") &&
                   !(flags & OBJ_SET_EX) && !(flags & OBJ_SET_PX))
        {
            flags |= OBJ_SET_KEEPTTL;
        //解析EX 秒为单位
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_KEEPTTL) &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        //解析PX 毫秒单位
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_KEEPTTL) &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
    //对字符串对象编码优化
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
复制代码

前面一部分是解析set方法参数,然后设置flags

  • nx 只在键不存在的时候才进行set
  • xx 只在键存在的时候才进行set
  • KEEPTTL 保留之前设置过的过期时间
  • EX 秒为单位的过期时间
  • PX 毫秒为单位的过期时间

解析了上面的参数过后,会进入一个重要的过程:tryObjectEncoding

robj *tryObjectEncoding(robj *o) {
    long value;
    sds s = o->ptr;
    size_t len;

    //确保type是字符串类型
    serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);

    //确保字符串对象的编码类型为RAW或EMBSTR
    if (!sdsEncodedObject(o)) return o;


     //refcount大于1为共享对象 不对共享对象进行编码
     if (o->refcount > 1) return o;


    len = sdslen(s);
    //字符串长度<=20 表示在long long范围 可以long编码
    if (len <= 20 && string2l(s,len,&value)) {
        //value范围在[0-9999) 表示可以使用共享整数对象
        if ((server.maxmemory == 0 ||
            !(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
            value >= 0 &&
            value < OBJ_SHARED_INTEGERS)
        {
            //将自身refCount减少 等于1的时候释放
            decrRefCount(o);
            incrRefCount(shared.integers[value]);
            return shared.integers[value];
        } else {
            //如果以前是raw格式 修改encoding和指针
            if (o->encoding == OBJ_ENCODING_RAW) {
                sdsfree(o->ptr);
                o->encoding = OBJ_ENCODING_INT;
                o->ptr = (void*) value;
                return o;
            } else if (o->encoding == OBJ_ENCODING_EMBSTR) {
                decrRefCount(o);
                return createStringObjectFromLongLongForValue(value);
            }
        }
    }

    //字符串大于20小于44 使用emb编码
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;

        if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
        //使用sds字符串 减少原对象的refcount
        emb = createEmbeddedStringObject(s,sdslen(s));
        decrRefCount(o);
        return emb;
    }

    //无法进行编码 优化sds使用空间
    trimStringObjectIfNeeded(o);

    return o;
}
复制代码

tryObjectEncoding对字符串进行编码用于优化空间,提一点embstr一次性分配64位内存空间,减去redisObject和sds对象头,再减去结尾的’/0’,一共还剩下44,这一点可以根据之前的文章算一下。

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    if (expire) {
        //通过expire对象获取milliseconds 失败返回
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }
    //对xx和nx参数的判断
    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
        return;
    }
    genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
    server.dirty++;
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}
复制代码

真实的set方法,主要看一下genericSetKey,代码里面的lookupKeyWrite也包含在内

void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
    } else {
        dbOverwrite(db,key,val);
    }
    incrRefCount(val);
    if (!keepttl) removeExpire(db,key);
    if (signal) signalModifiedKey(c,db,key);
}
复制代码

根据lookupKeyWrite查询的结果来判断是新增key还是修改值,并且增加refcount的值

robj *lookupKeyWrite(redisDb *db, robj *key) {
    return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}

robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
    expireIfNeeded(db,key);
    return lookupKey(db,key,flags);
}

int expireIfNeeded(redisDb *db, robj *key) {
    //key未过期
    if (!keyIsExpired(db,key)) return 0;
    
    //如果是从库只返回装备 不涉及到删除key
    if (server.masterhost != NULL) return 1;
    
    server.stat_expiredkeys++;
    //命令传播
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    //延迟删除还是直接删除
    int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                               dbSyncDelete(db,key);
    if (retval) signalModifiedKey(NULL,db,key);
    return retval;
}
复制代码

lookupKeyWriteWithFlags会执行expireIfNeeded逻辑,expireIfNeeded部分逻辑如上注释,这段代码主要是处理过期key的问题,过期策略以后再讲

robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else {
                val->lru = LRU_CLOCK();
            }
        }
        return val;
    } else {
        return NULL;
    }
}
复制代码

这里通过dictFind获取到以前的对象,如果有一个子进程就不进行lru/lfu的跟新,由于LRU和LFU的算法不一样,所以跟新策略不一样,这里放到后文讲。

  genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
    server.dirty++;
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
复制代码

set执行后会增加server.dirty,表示上次保存前数据新增了一条,如果有过期时间会调用setExpire来设置过期时间,后续会触发通知。

void setExpire(client *c, redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;

    /* Reuse the sds from the main dict in the expire dict */
    kde = dictFind(db->dict,key->ptr);
    serverAssertWithInfo(NULL,key,kde != NULL);
    de = dictAddOrFind(db->expires,dictGetKey(kde));
    dictSetSignedIntegerVal(de,when);

    int writable_slave = server.masterhost && server.repl_slave_ro == 0;
    if (c && writable_slave && !(c->flags & CLIENT_MASTER))
        rememberSlaveKeyWithExpire(db,key);
}
复制代码

从这里可以发现expires字典里面和dict字典指向的是同一个对象。

总结:set方法抛开对nx等参数的处理,主要逻辑是对redisBd的dict进行设置,在设置key的过程中会验证原key是否过期,并且跟新lru/lfu信息

get命令

void getCommand(client *c) {
    getGenericCommand(c);
}

int getGenericCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
        return C_OK;

    if (o->type != OBJ_STRING) {
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        addReplyBulk(c,o);
        return C_OK;
    }
}

robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

robj *lookupKeyRead(redisDb *db, robj *key) {
    return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}
复制代码

主要调用lookupKeyReadOrReply,继续调用lookupKeyRead——>lookupKeyReadWithFlags

robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
    robj *val;

    if (expireIfNeeded(db,key) == 1) {
        if (server.masterhost == NULL) {
            server.stat_keyspace_misses++;
            notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
            return NULL;
        }

        if (server.current_client &&
            server.current_client != server.master &&
            server.current_client->cmd &&
            server.current_client->cmd->flags & CMD_READONLY)
        {
            server.stat_keyspace_misses++;
            notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
            return NULL;
        }
    }
    val = lookupKey(db,key,flags);
    if (val == NULL) {
        server.stat_keyspace_misses++;
        notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
    }
    else
        server.stat_keyspace_hits++;
    return val;
}
复制代码

expireIfNeeded的出现和set方法一样,都是判断key过期的行为,这里对主从进行了判断。

因为如果设置了主从的读写分离,从库是无法删除过期key的,只能被动等待主库的命令,所以当查询到key过期的时候,只有主库和设置只读的从库能够安全的返回NULL,后续还是得调用lookupKey去dict里面查询数据,并且跟新lru,和set是一致的。

monitor命令

monitor命令是将当前客户端变成一个监视器,能够看到redis服务器接下来执行的所有命令

void monitorCommand(client *c) {
    /* ignore MONITOR if already slave or in monitor mode */
    if (c->flags & CLIENT_SLAVE) return;

    c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
    listAddNodeTail(server.monitors,c);
    addReply(c,shared.ok);
}
复制代码

monitors是一个list,用于存放客户端,同时设置flags。

 if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }
复制代码

在call方法中有这一行代码,执行具体方法之前会判断是否有客户端订阅,然后调用replicationFeedMonitors。从这里可以看出来,monitor命令是不管服务器有没有执行成功,都会先发送给客户端

listRewind(monitors,&li);
while((ln = listNext(&li))) {
        client *monitor = ln->value;
        addReply(monitor,cmdobj);
    }
复制代码

replicationFeedMonitors核心就是遍历list,依次发给客户端命令。

总结

从上诉源码来看,get和set命令中可以了解到以下几点:

  • redis主要是由一个hashmap组成,过期key保存在expiresDict中
  • set和get命令过程中都会查询key是否过期,针对主从节点的不同执行不同的策略。
  • redis在set和get命令过程中都跟新了key的lru信息,但并不像传统的lru算法一样维护一个fifo队列
  • 执行命令的过程中会处理各种情况:monitor、主从、事务等。
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享