Redis 集群分析- Cluster (集群) 下的操作的变化

Redis 是如何处理请求的?

server.c
在统一处理请求时,会判断出集群模式,进行处理

int processCommand(client *c) {
    ...
    /* 如何开启了集群模式,那么这里进行转向操作。
     * 不过,如果是以下情况,那么节点不进行转向:
     * 1) 命令发生这是本节点
     * 2) 命令没有key参数
     */
    
    // 集群模下,根据 hashslot 找到对应的redis节点处理
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;
        // 集群已下线
        if (server.cluster->state != CLUSTER_OK) {
            flagTransaction(c);
            clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
            return C_OK;
            // 集群运行正常
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
            // 不能多建操作
            if (n == NULL) {
                flagTransaction(c);
                if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                    addReplySds(c, sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                    /* The request spawns mutliple keys in the same slot,
                     * but the slot is not "stable" currently as there is
                     * a migration or import in progress. */
                    addReplySds(c, sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                } else {
                    redisPanic("getNodeByQuery() unknown error.");
                }
                return REDIS_OK;
                // 命令针对的槽和键不是本节点处理,进行转发
            } else if (n != server.cluster->myself) {
                flagTransaction(c);
                // -<ASK or MOVED> <slot> <ip>:<port>
                // 例如 -ASK 10086 127.0.0.1:12345
                addReplySds(c, sdscatprintf(sdsempty(),
                                            "-%s %d %s:%d\r\n",
                                            (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                            hashslot, n->ip, n->port));
                return REDIS_OK;
            }
            // 如果执行到这里,说明键 key 所在的槽由本节点处理
            // 或者客户端执行的是无参数命令
        }
    }
    ...
}
复制代码

cluster.c
查找key对应的redis节点,并返回节点

clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
    // 初始化为 NULL ,
    // 如果输入命令是无参数命令,那么 n 就会继续为 NULL
    clusterNode *n = NULL;
    robj *firstkey = NULL;
    int multiple_keys = 0;
    multiState *ms, _ms;
    multiCmd mc;
    int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;

    /* Set error code optimistically for the base case. */
    if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE;

    /* We handle all the cases as if they were EXEC commands, so we have
     * a common code path for everything */
    // 集群可以执行事务,
    // 但必须确保事务中的所有命令都是针对某个相同的键进行的
    // 这个 if 和接下来的 for 进行的就是这一合法性检测
    if (cmd->proc == execCommand) {
        /* If REDIS_MULTI flag is not set EXEC is just going to return an
         * error. */
        if (!(c->flags & REDIS_MULTI)) return myself;
        ms = &c->mstate;
    } else {
        /* In order to have a single codepath create a fake Multi State
         * structure if the client is not in MULTI/EXEC state, this way
         * we have a single codepath below. */
        ms = &_ms;
        _ms.commands = &mc;
        _ms.count = 1;
        mc.argv = argv;
        mc.argc = argc;
        mc.cmd = cmd;
    }

    /* Check that all the keys are in the same hash slot, and obtain this
     * slot and the node associated. */
    for (i = 0; i < ms->count; i++) {
        struct redisCommand *mcmd;
        robj **margv;
        int margc, *keyindex, numkeys, j;

        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;

        // 定位命令的键位置
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        // 遍历命令中的所有键
        for (j = 0; j < numkeys; j++) {
            robj *thiskey = margv[keyindex[j]];
            // 计算hashSolot,crc16 算法
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));

            if (firstkey == NULL) {
                // 这是事务中第一个被处理的key
                // 所以说redis是只是通过第一个key找到对应的slot哈希槽
                firstkey = thiskey;
                slot = thisslot;
                n = server.cluster->slots[slot];
                redisAssertWithInfo(c,firstkey,n != NULL);
                /* If we are migrating or importing this slot, we need to check
                 * if we have all the keys in the request (the only way we
                 * can safely serve the request, otherwise we return a TRYAGAIN
                 * error). To do so we set the importing/migrating state and
                 * increment a counter for every missing key. */
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
                // 非 firstkey 
            } else {   
             // 但是每个key对会被检查。
             // firstkey,thiskey 不一致时
                if (!equalStringObjects(firstkey,thiskey)) {
                    // 当两个槽不一样时,执行失败
                    if (slot != thisslot) {
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT;
                        return NULL;
                    } else {
                         // 是否有复合的key
                        multiple_keys = 1;
                    }
                }
            }

            /* Migarting / Improrting slot? Count keys we don't have. */
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            {
                missing_keys++;
            }
        }
        getKeysFreeResult(keyindex);
    }

    /* No key at all in command? then we can serve the request
     * without redirections or errors. */
    if (n == NULL) return myself;

    /* Return the hashslot by reference. */
    if (hashslot) *hashslot = slot;

    /* This request is about a slot we are migrating into another instance?
     * Then if we have all the keys. */

    /* If we don't have all the keys and we are migrating the slot, send
     * an ASK redirection. */
    if (migrating_slot && missing_keys) {
        if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
    }

    /* If we are receiving the slot, and the client correctly flagged the
     * request as "ASKING", we can serve the request. However if the request
     * involves multiple keys and we don't have them all, the only option is
     * to send a TRYAGAIN error. */
    if (importing_slot &&
        (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            return myself;
        }
    }

    /* Handle the read-only client case reading from a slave: if this
     * node is a slave and the request is about an hash slot our master
     * is serving, we can reply without redirection. */
    if (c->flags & REDIS_READONLY &&
        cmd->flags & REDIS_CMD_READONLY &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
    {
        return myself;
    }

    /* Base case: just return the right node. However if this node is not
     * myself, set error_code to MOVED since we need to issue a rediretion. */
    if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;

    // 返回负责处理槽 slot 的节点 n
    return n;
}
复制代码

计算 hashSlot(槽),使用crc16算法

这里要特别注意的是:
特殊语法: {key_with_hash} key_without_hash
如果 key 有包括 “{}“ 的部分,就用 ”{}“ 的部分做 hash,以定位到同一个 redis 节点的键。
注意:只有第一个 “{}” 才有用

unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */

    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;

    /* No '{' ? Hash the whole key. This is the base case. */
    if (s == keylen) return crc16(key,keylen) & 0x3FFF;

    /* '{' found? Check if we have the corresponding '}'. */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;

    /* No '}' or nothing betweeen {} ? Hash the whole key. */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

    /* If we are here there is both a { and a } on its right. Hash
     * what is in the middle between { and }. */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}

复制代码

从上面代码可以分析出

  1. 指令到达redis服务端,redis就会检查是否是集群状态。
  2. 如果非集群状态直接执行命令
  3. 如果是集群状态,会根据key去找到对应的hash槽再找到对应的redis服务端转发
  4. 我们需要注意的是,redis只会去匹配第一个key的hash槽
  5. 但是,如果其他key使用hash算法算出来的slot不一致时,这个命令执行失败(这个很重要,很多时候我们为了方面使用复合的命令,或者使用lua脚本。这个在redis单机版变到集群版很常见。而且,后面跟运维小姐姐沟通后,就算我们用 腾讯云和阿里云的redis,他们只是兼容的复合命令,没有兼容lua脚本(后话)
  6. 如果 key 有包括 “{}“ 的部分,就用 ”{}“ 的部分做 hash,以定位到同一个 redis 节点的键。(如果实在想要复合key,可以考虑用这个。还有lua脚本用得上)

Redis 单key的普通命令

一些普通的命令就会根据 firstkey 值取到对应的 slot 槽,转发到对应的redis服务

按照 firstkey 值转发到对应 slot区间的服

Redis 多个key的49.234.101.179:6376> set name zhangsan
直接转发到对应 slot的服
-> Redirected to slot [5798] located at 49.234.101.179:6372

49.234.101.179:6372> lpush list value1
直接转发到对应 slot的服
-> Redirected to slot [12291] located at 49.234.101.179:6373
(integer) 1

127.0.0.1:6376> sadd set value1

-> Redirected to slot [2964] located at 49.234.101.179:6371
(integer) 1

49.234.101.179:6371> hset map1 key1 value1
直接转发到对应 slot的服
-> Redirected to slot [8740] located at 49.234.101.179:6372
(integer) 1
复制代码

Redis 多key的复合命令

redis 127.0.0.1:6379> MGET KEY1 KEY2 .. KEYN
复制代码

如果其他key使用hash算法算出来的slot不一致时,这个命令执行失败(这个很重要,很多时候我们为了方面使用复合的命令就会发生异常:

(error) CROSSSLOT Keys in request don't hash to the same slot
复制代码

解法就是:在定义需要用到lua的数据结构的时候, key 可以先加上 “{key}”前缀作为要定位在那个slot里面

那当然他不好的地方就是,会把 “{key}” 这些数据分配到同一个slot上面,没有达到之前分区的效果。

官方的解释:

键哈希标签(Keys hash tags)
计算哈希槽可以实现哈希标签(hash tags),但这有一个例外。哈希标签是确保两个键都在同一个哈希槽里的一种方式。将来也许会使用到哈希标签,例如为了在集群稳定的情况下(没有在做碎片重组操作)允许某些多键操作。
为了实现哈希标签,哈希槽是用另一种不同的方式计算的。基本来说,如果一个键包含一个 “{…}” 这样的模式,只有 { 和 } 之间的字符串会被用来做哈希以获取哈希槽。但是由于可能出现多个 { 或 },计算的算法如下:
如果键包含一个 { 字符。
那么在 { 的右边就会有一个 }。
在 { 和 } 之间会有一个或多个字符,第一个 } 一定是出现在第一个 { 之后。
复制代码

Redis Lua 脚本

Lua脚本本质上还是一个多key的复合命令,所以跟复用的key命令一起受影响

Redis 分布式锁
为什么 我们的Redis的分布式锁实现方式也是lua脚本,也有多个参数,就没有受影响?

Redisson lua 脚本实现分布式锁:
local key = KEYS[1]; -- 第1个参数,锁的key
local threadId = ARGV[1]; -- 第2个参数,线程唯一标识
local releaseTime = ARGV[2]; -- 第3个参数,锁的自动释放时间

if(redis.call('exists', key) == 0) then -- 判断锁是否已存在
    redis.call('hset', key, threadId, '1'); -- 不存在, 则获取锁
    redis.call('expire', key, releaseTime); -- 设置有效期
    return 1; -- 返回结果
end;

if(redis.call('hexists', key, threadId) == 1) then -- 锁已经存在,判断threadId是否是自己    
    redis.call('hincrby', key, threadId, '1'); -- 如果是自己,则重入次数+1
    redis.call('expire', key, releaseTime); -- 设置有效期
    return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
复制代码

会遇到 “ 如果firstkey和其他key使用hash算法算出来的slot不一致时,这个命令执行失败 ”这个问题吗?

答案是不会的:

  • 因为这段代码里面只有一个 key
  • local key = KEYS[1]; — 第1个参数,锁的key
  • local threadId = ARGV[1]; — 第2个参数,线程唯一标识
  • local releaseTime = ARGV[2]; — 第3个参数,锁的自动释放时间

第二三个参数都是 ARGV 数组里面的。

参考文章:
慕课网《高级Redis应用进阶课 一站式Redis解决方案》
coding.imooc.com/learn/list/…

Redis Cluster 有哪些优势和限制?
www.cnblogs.com/lovezbs/p/1…

redis 3.0 注解版源码
gitee.com/ckl111/redi…

redis 6.0 源码
github.com/redis/redis…

Redis 集群规范 :
redis.cn/topics/clus…

Redis 集群教程:
redis.cn/topics/clus…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享