Redis6.0.6源码阅读——sentinel原理

前言

本文介绍redis sentinel机制的原理和实现方式,redis可以配置主从模式来进行复制,但是一旦主服务器出问题了需要手工切换服务器,所以主从模式下并没有实现高可用,在切换的过程中还是会出现短暂的不可用。redis使用了sentinel(哨兵)来监控主从节点,当监视到主服务器不可用时会触发故障转移,从而自动切换服务器实现自动化。

正文

架构.png

在配置了主从模式下,可以配置多个Sentinel节点来对主从节点进行监控

  • 监控:Sentinel会定期发送命令给其他节点,以确保节点是否存活
  • 故障转移:当多个Sentinel认为主节点下线的时候,会选出一个Sentinel节点来作为鼓掌转移节点,会选出一个从节点来作为主节点
  • 通知:在选出主节点后会将结果进行通知

配置和初始化

sentinel monitor <master-name> <ip> <port> <count>
复制代码

表示当前需要监控的节点信息,count表示超过多少个sentinel认为该节点下线了,则视为下线,推荐sentinel数量的二分之一+1

sentinel down-after-millseconds <master-name> 100000
复制代码

设置对应机器的超时时间,超过这个时间则认为机器下线

sentinel failover-timeout mymaster 180000
复制代码

表示故障转移的时间,超过该时间认为故障转移失败

char *sentinelHandleConfiguration(char **argv, int argc) {
    sentinelRedisInstance *ri;

    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        /* monitor <name> <host> <port> <quorum> */
        //quorum表示多少个哨兵数量任务服务不可用 才进行failover
        int quorum = atoi(argv[4]);

        if (quorum <= 0) return "Quorum must be 1 or greater.";
        //创建master实例
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL)
        {
            switch(errno) {
            case EBUSY: return "Duplicated master name.";
            case ENOENT: return "Can't resolve master instance hostname.";
            case EINVAL: return "Invalid port number";
            }
        }
        //down-after-milliseconds表示一个服务器多少毫秒内没有应答 就认为它下线
    } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
        /* down-after-milliseconds <name> <milliseconds> */
        //检查是否存在实例
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->down_after_period = atoi(argv[2]);
        if (ri->down_after_period <= 0)
            return "negative or zero time parameter.";
        sentinelPropagateDownAfterPeriod(ri);
        //故障切换的时间 如果超时则视为故障切换失败
    } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
        /* failover-timeout <name> <milliseconds> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->failover_timeout = atoi(argv[2]);
        if (ri->failover_timeout <= 0)
            return "negative or zero time parameter.";
        //指定多个Redis服务同步新的主机
    } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
        /* parallel-syncs <name> <milliseconds> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->parallel_syncs = atoi(argv[2]);
        //发生异常时调用的脚本
    }
复制代码

sentinelHandleConfiguration代码中印证了上述配置,会调用createSentinelRedisInstance创建一个主服务器实例

sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
    sentinelRedisInstance *ri;
    sentinelAddr *addr;
    dict *table = NULL;
    char slavename[NET_PEER_ID_LEN], *sdsname;

    serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
    serverAssert((flags & SRI_MASTER) || master != NULL);

    /* Check address validity. */
    addr = createSentinelAddr(hostname,port);
    if (addr == NULL) return NULL;

    /* For slaves use ip:port as name. */
    if (flags & SRI_SLAVE) {
        anetFormatAddr(slavename, sizeof(slavename), hostname, port);
        name = slavename;
    }

    if (flags & SRI_MASTER) table = sentinel.masters;
    else if (flags & SRI_SLAVE) table = master->slaves;
    else if (flags & SRI_SENTINEL) table = master->sentinels;
    //判断设置的名称是否重复
    sdsname = sdsnew(name);
    if (dictFind(table,sdsname)) {
        releaseSentinelAddr(addr);
        sdsfree(sdsname);
        errno = EBUSY;
        return NULL;
    }

复制代码

可以看到sentinel.masters用于存储主服务器实例,而主服务器实例里面有slaves和sentinels的字典来存储对应的从服务和哨兵服务

void initSentinel(void) {
    unsigned int j;

    //删除redis命令 添加哨兵命令
    dictEmpty(server.commands,NULL);
    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
        int retval;
        struct redisCommand *cmd = sentinelcmds+j;

        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
        serverAssert(retval == DICT_OK);

        if (populateCommandTableParseFlags(cmd,cmd->sflags) == C_ERR)
            serverPanic("Unsupported command flag");
    }

    //初始化各种数据
    sentinel.current_epoch = 0;
    sentinel.masters = dictCreate(&instancesDictType,NULL);
    sentinel.tilt = 0;
    sentinel.tilt_start_time = 0;
    sentinel.previous_time = mstime();
    sentinel.running_scripts = 0;
    sentinel.scripts_queue = listCreate();
    sentinel.announce_ip = NULL;
    sentinel.announce_port = 0;
    sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
    sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
    memset(sentinel.myid,0,sizeof(sentinel.myid));
}
复制代码

initSentinel用于初始化自己,current_epoch和raft里面的term比较像,表示当前任期。

原理

哨兵.png

sentinel通过定时任务的方式来定期执行监控操作,由以下几点组成:

节点通信

  • 每隔1S发送ping命令:ping命令用于连通性判断,会记录发送时间和接收时间
  • 每隔2S发送hello命令:hello命令用于发送自己和记录的master信息,用于当服务器之间跟新master以及epoch信息
  • 每隔10s发送info命令:info命令用于向对方请求获取节点信息,可以通过此方式发现slave服务器等

下线机制

  • 主观下线:ping命令超时了,则主观认为下线了
  • 客观下线:当主观认为下线了以后,则询问其他哨兵服务器是否下线

故障转移

当主服务客观下线后,可以向其他sentinel发送选主请求,来选择谁来当故障转移服务器,选主raft算法一样。当选主成功后,故障转移服务器会选择一个从服务器升级为主服务器

源码

void sentinelTimer(void) {
    //检查上一次调用时间和当前间隔 是否进入被动模式
    sentinelCheckTiltCondition();
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();
复制代码

通过该定时任务执行,逐步分析

void sentinelCheckTiltCondition(void) {
    mstime_t now = mstime();
    mstime_t delta = now - sentinel.previous_time;

    if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
        sentinel.tilt = 1;
        sentinel.tilt_start_time = mstime();
        sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
    }
    sentinel.previous_time = mstime();
}
复制代码

sentinelCheckTiltCondition是判断上一次定时任务执行时间,如果超过2s表示要进入titl模式,因为一次的流程太长所以该服务器与其他服务器的配置会不一致,所以不应该信任他,titl模式无法进行主动命令只能被动接收

void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    //获取哨兵实例
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            //故障转移 选出新的服务器
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}
复制代码

sentinelHandleDictOfRedisInstances方法是遍历master服务器,并对它以及slave和sentinel执行sentinelHandleRedisInstance

void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    //连接实例
    sentinelReconnectInstance(ri);
    //发送命令
    sentinelSendPeriodicCommands(ri);

    //解除titl模式
    if (sentinel.tilt) {
        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
        sentinel.tilt = 0;
        sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
    }

    
    sentinelCheckSubjectivelyDown(ri);
    
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
  
    }

    /* Only masters */
    if (ri->flags & SRI_MASTER) {
        //判断master是否进入ODWON状态
        sentinelCheckObjectivelyDown(ri);
        //是否故障转移
        if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}
复制代码

sentinelHandleRedisInstance流程

创建连接

首先执行sentinelReconnectInstance命令,如果还没有进行连接,对该实例进行异步连接以及PUB/SUB连接,该方法源码不放出来了,根据host和port连接

发送命令

sentinelSendPeriodicCommands发送INFO、PING、HELLO命令

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    //disconnected尽快返回
    if (ri->link->disconnected) return;

    //发送了很多命令但是没有返回,超过了100就不再发送命令 避免堆积
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

    //info10秒执行一次 但是如果主节点down或者发送故障转移 就要1s执行一次 加快对服务器的变动信息获取
    if ((ri->flags & SRI_SLAVE) &&
        ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
         (ri->master_link_down_time != 0)))
    {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }

    //ping1s执行一次,如果比down_after_period(超时下线时间)小,则为down_after_period。
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    //只向主节点和从节点发送info命令
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "%s",
            sentinelInstanceMapCommand(ri,"INFO"));
        if (retval == C_OK) ri->link->pending_commands++;
    }

    //发送ping
    if ((now - ri->link->last_pong_time) > ping_period &&
               (now - ri->link->last_ping_time) > ping_period/2) {
        sentinelSendPing(ri);
    }

    //发送hello 2s执行一次 用于发现其他哨兵
    if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        sentinelSendHello(ri);
    }
}
复制代码

流程如下:

  • 命令只能堆积100次,如果发送了100次都没有回复则不发送
  • INFO命令只向Master和Slave发送,本身10S执行一次,但是如果主节点下线或者在执行故障转移,则改为1S一次来加快信息的获取
  • ping命令1s执行一次
  • hello命令2s执行一次
ping命令

ping命令.png

int sentinelSendPing(sentinelRedisInstance *ri) {
    int retval = redisAsyncCommand(ri->link->cc,
        sentinelPingReplyCallback, ri, "%s",
        sentinelInstanceMapCommand(ri,"PING"));
    if (retval == C_OK) {
        ri->link->pending_commands++;
        ri->link->last_ping_time = mstime();
        /* We update the active ping time only if we received the pong for
         * the previous ping, otherwise we are technically waiting since the
         * first ping that did not receive a reply. */
        if (ri->link->act_ping_time == 0)
            ri->link->act_ping_time = ri->link->last_ping_time;
        return 1;
    } else {
        return 0;
    }
}
复制代码

发送ping命令代码,只跟新last_ping_time,对方收到ping命令后会进行回复,接收方法太简单不用看了,看sentinelPingReplyCallback方法

void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
    redisReply *r;

    if (!reply || !link) return;
    link->pending_commands--;
    r = reply;

    if (r->type == REDIS_REPLY_STATUS ||
        r->type == REDIS_REPLY_ERROR) {
        if (strncmp(r->str,"PONG",4) == 0 ||
            strncmp(r->str,"LOADING",7) == 0 ||
            strncmp(r->str,"MASTERDOWN",10) == 0)
        {
            link->last_avail_time = mstime();
            link->act_ping_time = 0; 
        } else {
            //如果执行lua超时发送kill
            if (strncmp(r->str,"BUSY",4) == 0 &&
                (ri->flags & SRI_S_DOWN) &&
                !(ri->flags & SRI_SCRIPT_KILL_SENT))
            {
                if (redisAsyncCommand(ri->link->cc,
                        sentinelDiscardReplyCallback, ri,
                        "%s KILL",
                        sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
                {
                    ri->link->pending_commands++;
                }
                ri->flags |= SRI_SCRIPT_KILL_SENT;
            }
        }
    }
    //ping命令成功跟新last_avail_time last_pong_time
    link->last_pong_time = mstime();
}
复制代码

只要收到了命令就会更新last_pong_time,但是只会在有效回复更新last_avail_time来表示

hello命令

hello命令.png

  snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," //自己信息
        "%s,%s,%d,%llu", //当前master信息
        announce_ip, announce_port, sentinel.myid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
        (unsigned long long) master->config_epoch);
    retval = redisAsyncCommand(ri->link->cc,
        sentinelPublishReplyCallback, ri, "%s %s %s",
        sentinelInstanceMapCommand(ri,"PUBLISH"),
        SENTINEL_HELLO_CHANNEL,payload);
    if (retval != C_OK) return C_ERR;
    ri->link->pending_commands++;
    return C_OK;
复制代码

hello命令的部分代码,hello会发送自己以及认为的leader的:ip、port、myid、master_name、current_epoch和master的current_epoch

void sentinelPublishCommand(client *c) {
    if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
        addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
        return;
    }
    sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
    addReplyLongLong(c,1);
}
复制代码

hello命令的接受命令,看sentinelProcessHelloMessage

void sentinelProcessHelloMessage(char *hello, int hello_len) {
    /* 命令组成格式
     * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
     * 5=master_ip,6=master_port,7=master_config_epoch. */
    int numtokens, port, removed, master_port;
    uint64_t current_epoch, master_config_epoch;
    char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
    sentinelRedisInstance *si, *master;

    if (numtokens == 8) {
        //获取master主机信息
        master = sentinelGetMasterByName(token[4]);
        if (!master) goto cleanup;

        port = atoi(token[1]);
        master_port = atoi(token[6]);
        //获取发来的哨兵信息
        si = getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels,token[0],port,token[2]);
        current_epoch = strtoull(token[3],NULL,10);
        master_config_epoch = strtoull(token[7],NULL,10);

        if (!si) {
            //尝试删除master保存下的哨兵
            removed = removeMatchingSentinelFromMaster(master,token[2]);
            if (removed) {
                sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
                    "%@ ip %s port %d for %s", token[0],port,token[2]);
            } else {
                //检查是否有相同地址的哨兵 如果有需要将地址标记为0 无效
                sentinelRedisInstance *other =
                    getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels, token[0],port,NULL);
                if (other) {
                    sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
                    other->addr->port = 0;
                    //将所有与该哨兵有关联的master取消连接 确保后续跟新一致性
                    sentinelUpdateSentinelAddressInAllMasters(other);
                }
            }


            //创建哨兵 然后修改
            si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
                            token[0],port,master->quorum,master);

            if (si) {
                if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
                si->runid = sdsnew(token[2]);
                sentinelTryConnectionSharing(si);
                if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
                sentinelFlushConfig();
            }
        }

        //跟新current_epoch
        if (current_epoch > sentinel.current_epoch) {
            sentinel.current_epoch = current_epoch;
            sentinelFlushConfig();
            sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
                (unsigned long long) sentinel.current_epoch);
        }

        //跟新master信息
        if (si && master->config_epoch < master_config_epoch) {
            master->config_epoch = master_config_epoch;
            if (master_port != master->addr->port ||
                strcmp(master->addr->ip, token[5]))
            {
                sentinelAddr *old_addr;

                sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
                sentinelEvent(LL_WARNING,"+switch-master",
                    master,"%s %s %d %s %d",
                    master->name,
                    master->addr->ip, master->addr->port,
                    token[5], master_port);

                old_addr = dupSentinelAddr(master->addr);
                sentinelResetMasterAndChangeAddress(master, token[5], master_port);
                sentinelCallClientReconfScript(master,
                    SENTINEL_OBSERVER,"start",
                    old_addr,master->addr);
                releaseSentinelAddr(old_addr);
            }
        }

        /* Update the state of the Sentinel. */
        if (si) si->last_hello_time = mstime();
    }

cleanup:
    sdsfreesplitres(token,numtokens);
}
复制代码

流程如下:

  • 尝试根据发送的信息获取master的实例以及sentinel实例
  • 如果当前master下没有sentinel实例,需要尝试删除或删除相同地址的sentinel,因为需要跟新sentinel,确保一致性
  • 和raft一样,如果对方的current_epoch比自己大,更新current_epoch
  • 如果对方发来的master_epoch比自己保存的大,说明master更换了,需要跟新信息

++hello命令的callbak代码只涉及到时间的跟新。这里不放出来了++

info命令
if (defsections || allsections || !strcasecmp(section,"sentinel")) {
        dictIterator *di;
        dictEntry *de;
        int master_id = 0;

        if (sections++) info = sdscat(info,"\r\n");
        info = sdscatprintf(info,
            "# Sentinel\r\n"
            "sentinel_masters:%lu\r\n"
            "sentinel_tilt:%d\r\n"
            "sentinel_running_scripts:%d\r\n"
            "sentinel_scripts_queue_length:%ld\r\n"
            "sentinel_simulate_failure_flags:%lu\r\n",
            dictSize(sentinel.masters),
            sentinel.tilt,
            sentinel.running_scripts,
            listLength(sentinel.scripts_queue),
            sentinel.simfailure_flags);

        di = dictGetIterator(sentinel.masters);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            char *status = "ok";

            if (ri->flags & SRI_O_DOWN) status = "odown";
            else if (ri->flags & SRI_S_DOWN) status = "sdown";
            info = sdscatprintf(info,
                "master%d:name=%s,status=%s,address=%s:%d,"
                "slaves=%lu,sentinels=%lu\r\n",
                master_id++, ri->name, status,
                ri->addr->ip, ri->addr->port,
                dictSize(ri->slaves),
                dictSize(ri->sentinels)+1);
        }
        dictReleaseIterator(di);
    }

    addReplyBulkSds(c, info);
}
复制代码

info命令的部分接受代码,收到了info命令后,需要向对方发送以上信息

void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
    redisReply *r;

    if (!reply || !link) return;
    link->pending_commands--;
    r = reply;

    if (r->type == REDIS_REPLY_STRING)
        sentinelRefreshInstanceInfo(ri,r->str);
}
复制代码

info命令的Callbacl方法,sentinelRefreshInstanceInfo代码很长,其实干了以下几件事:

  • 如果是主节点
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
                            atoi(port), ri->quorum, ri)) != NULL)
                {
                    sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
                    sentinelFlushConfig();
                }
            }
复制代码

提取从节点的host和port,如果没有则创建从节点,通过info来获取从节点

  • 如果info是从节点:则更新从节点信息
  • 当前记录的信息有可能和获取的不一样,如果当前记录是主节点,但是返回的是从节点,那么就需要执行slaveOf
  • 如果返回的从节点,但是从节点记录的主节点和我们记录的主节点不一致,需要执行slaveOf
  • 如果返回节点变成了主节点,触发转移流程

主观下线检查

void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;

    if (ri->link->act_ping_time)
        elapsed = mstime() - ri->link->act_ping_time;
    else if (ri->link->disconnected)
        elapsed = mstime() - ri->link->last_avail_time;

    //检查是否重新连接
    //1) 当前有连接 但是超过了SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且超过了一半的down_after_period
    if (ri->link->cc &&
        (mstime() - ri->link->cc_conn_time) >
        SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        ri->link->act_ping_time != 0 &&
        (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
        (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
    {
        instanceLinkCloseConnection(ri->link,ri->link->cc);
    }

    //2)检查pubsub连接 超过SENTINEL_PUBLISH_PERIOD*3
    if (ri->link->pc &&
        (mstime() - ri->link->pc_conn_time) >
         SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
    {
        instanceLinkCloseConnection(ri->link,ri->link->pc);
    }

    //设置或取消SDOWN
    //1)超过时间没有回复命令
    //2)当前哨兵认为该实例是主服务 但是服务器向哨兵报告它成为从服务 但是超时了还没有进行角色转换
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
        /* Is subjectively down */
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        /* Is subjectively up */
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
        }
    }
}
复制代码

主观下线有两个判断依据:

  1. 超过时间没有回复命令
  2. 当前哨兵认为该实例是主服务 但是服务器向哨兵报告它成为从服务 但是超时了还没有进行角色转换

其中一个判断成功,则设置SDOWN

客观下线

void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;

    if (master->flags & SRI_S_DOWN) {
        quorum = 1;
       //遍历哨兵
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);

            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        if (quorum >= master->quorum) odown = 1;
    }

    /* Set the flag accordingly to the outcome. */
    if (odown) {
        if ((master->flags & SRI_O_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            master->flags |= SRI_O_DOWN;
            master->o_down_since_time = mstime();
        }
    } else {
        if (master->flags & SRI_O_DOWN) {
            sentinelEvent(LL_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}
复制代码

这里遍历每台sentinels的flags是否有SRI_MASTER_DOWN数量,超过一定数量则会被判断客观下线

是否进行故障转移

int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
    //进入ODWON才能故障转移
    if (!(master->flags & SRI_O_DOWN)) return 0;

    //正在进行中
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

    //上一次故障转移时间太久了 记录log
    if (mstime() - master->failover_start_time <
        master->failover_timeout*2)
    {
        if (master->failover_delay_logged != master->failover_start_time) {
            time_t clock = (master->failover_start_time +
                            master->failover_timeout*2) / 1000;
            char ctimebuf[26];

            ctime_r(&clock,ctimebuf);
            ctimebuf[24] = '\0'; /* Remove newline. */
            master->failover_delay_logged = master->failover_start_time;
            serverLog(LL_WARNING,
                "Next failover delay: I will not start a failover before %s",
                ctimebuf);
        }
        return 0;
    }

    sentinelStartFailover(master);
    return 1;
}
复制代码

只有客观下线才能进入故障转移,而判断客观下线是在sentinelAskMasterStateToOtherSentinels中

if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
复制代码

这里是首先判断是否需要故障转移,然后执行sentinelAskMasterStateToOtherSentinels强制故障转移,随后还会执行sentinelAskMasterStateToOtherSentinels,所以判断是否客观在这个方法里面,我们顺着刚才的方法往下走

void sentinelStartFailover(sentinelRedisInstance *master) {
    serverAssert(master->flags & SRI_MASTER);

    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
    master->flags |= SRI_FAILOVER_IN_PROGRESS;
    //自增current_epoch
    master->failover_epoch = ++sentinel.current_epoch;
    sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
        (unsigned long long) sentinel.current_epoch);
    sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
    master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    master->failover_state_change_time = mstime();
}
复制代码

开始故障转移,设置flags以及增加current_epoch

void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;

        //其他哨兵的状态太久 不采纳
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }

        //主观判断down
        if ((master->flags & SRI_S_DOWN) == 0) continue;
        //断开了
        if (ri->link->disconnected) continue;
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;

        //发送命令 sentinel is-master-down-by-addr <ip> <port> <current_epoch> <runid>
        ll2string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->link->cc,
                    sentinelReceiveIsMasterDownReply, ri,
                    "%s is-master-down-by-addr %s %s %llu %s",
                    sentinelInstanceMapCommand(ri,"SENTINEL"),
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    sentinel.myid : "*");
        if (retval == C_OK) ri->link->pending_commands++;
    }
    dictReleaseIterator(di);
}
复制代码

这里遍历了每一个sentinel,然后发送命令

sentinel is-master-down-by-addr <ip> <port> <current_epoch> <runid>
复制代码

该命令会循环其他sentinel主节点是否下线,并且会携带一个runid,这runid可以为*和自己的id,当为自己的runid时表示需要其他sentinel给自己投票,选出一个负责故障转移的机器来协调重新选主

 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
            c->argv[2]->ptr,port,NULL);

        //是否下线
        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
                                    (ri->flags & SRI_MASTER))
            isdown = 1;

        //如果指定了runid 投票
        if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
            leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
                                            c->argv[5]->ptr,
                                            &leader_epoch);
        }

        /* Reply with a three-elements multi-bulk reply:
         * down state, leader, vote epoch. */
        addReplyArrayLen(c,3);
        addReply(c, isdown ? shared.cone : shared.czero);
        addReplyBulkCString(c, leader ? leader : "*");
        addReplyLongLong(c, (long long)leader_epoch);
        if (leader) sdsfree(leader);
复制代码

这是is-master-down-by-addr的接收方法,如果当前sentinel认为主节点主观下线那么会通知;如果设置了runid,会调用sentinelVoteLeader进行投票

++投票的逻辑很简单,就是判断epoch。如果sentinel发送过来的请求的epoch比较大就投给它,和raft选举领导一样++

 //跟新时间 并且设置该sentinel认为下线
        ri->last_master_down_reply_time = mstime();
        if (r->element[0]->integer == 1) {
            ri->flags |= SRI_MASTER_DOWN;
        } else {
            ri->flags &= ~SRI_MASTER_DOWN;
        }
        //如果是一个投票 获取投票结果 打印日志并且设置leader
        if (strcmp(r->element[1]->str,"*")) {
            /* If the runid in the reply is not "*" the Sentinel actually
             * replied with a vote. */
            sdsfree(ri->leader);
            if ((long long)ri->leader_epoch != r->element[2]->integer)
                serverLog(LL_WARNING,
                    "%s voted for %s %llu", ri->name,
                    r->element[1]->str,
                    (unsigned long long) r->element[2]->integer);
            ri->leader = sdsnew(r->element[1]->str);
            ri->leader_epoch = r->element[2]->integer;
        }
    }
复制代码

投票的回调方法

执行故障转移

void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    serverAssert(ri->flags & SRI_MASTER);

    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

    switch(ri->failover_state) {
        //等待开始
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
            //选主
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
            //升级
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
            //等待升级
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
            //发送slave命令
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}
复制代码

故障转移状态机,下图看状态是在什么时候转移的

状态机.png

从客观下线修改状态开始,前面四个状态都是单向流动的,直到最后一个状态是在info命令的回调改变的,看一下状态机流程:

  • SENTINEL_FAILOVER_STATE_WAIT_START
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
    char *leader;
    int isleader;

    /* Check if we are the leader for the failover epoch. */
    leader = sentinelGetLeader(ri, ri->failover_epoch);
    isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
    sdsfree(leader);

    //如果投票结果不是自己 并且不是强制故障转移
    if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
        int election_timeout = SENTINEL_ELECTION_TIMEOUT;

        //选举超时
        if (election_timeout > ri->failover_timeout)
            election_timeout = ri->failover_timeout;
        //中断选举
        if (mstime() - ri->failover_start_time > election_timeout) {
            sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
            sentinelAbortFailover(ri);
        }
        return;
    }
    sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
    if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
        sentinelSimFailureCrash();
    //修改状态
    ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
    ri->failover_state_change_time = mstime();
    sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}
复制代码

通过sentinelGetLeader去统计其他票数,如果是自己就开始下一个状态

di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
            sentinelLeaderIncr(counters,ri->leader);
    }
    dictReleaseIterator(di);
复制代码

统计票数逻辑

  • SENTINEL_FAILOVER_STATE_SELECT_SLAVE
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
    sentinelRedisInstance *slave = sentinelSelectSlave(ri);
复制代码

通过sentinelSelectSlave方法选择一个从节点升级为主节点

sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
    sentinelRedisInstance **instance =
        zmalloc(sizeof(instance[0])*dictSize(master->slaves));
    sentinelRedisInstance *selected = NULL;
    int instances = 0;
    dictIterator *di;
    dictEntry *de;
    mstime_t max_master_down_time = 0;

    if (master->flags & SRI_S_DOWN)
        max_master_down_time += mstime() - master->s_down_since_time;
    max_master_down_time += master->down_after_period * 10;

    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        mstime_t info_validity_time;

        //不能是下线 失去连接 不活跃 而且优先级为0的
        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
        if (slave->link->disconnected) continue;
        if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
        if (slave->slave_priority == 0) continue;

        //如果主节点下线了 info回复时间应该减少 为了快速获取结果
        if (master->flags & SRI_S_DOWN)
            info_validity_time = SENTINEL_PING_PERIOD*5;
        else
            info_validity_time = SENTINEL_INFO_PERIOD*3;
        //处理超时
        if (mstime() - slave->info_refresh > info_validity_time) continue;
        if (slave->master_link_down_time > max_master_down_time) continue;
        instance[instances++] = slave;
    }
    dictReleaseIterator(di);
    if (instances) {
        qsort(instance,instances,sizeof(sentinelRedisInstance*),
            compareSlavesForPromotion);
        selected = instance[0];
    }
    zfree(instance);
    return selected;
}
复制代码

从slave中选择优先级高、活跃的节点来作为新的主节点,选中节点后修改状态执行下一步

  • SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
    int retval;

    //超时中断
    if (ri->promoted_slave->link->disconnected) {
        if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
            sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
            sentinelAbortFailover(ri);
        }
        return;
    }

    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
    if (retval != C_OK) return;
    sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
        ri->promoted_slave,"%@");
    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
    ri->failover_state_change_time = mstime();
}
复制代码

执行sentinelSendSlaveOf来升级被选中的从节点为主节点,主要执行slave of命令,并且修改状态进行下一步

  • SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
    if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
        sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
        sentinelAbortFailover(ri);
    }
}
复制代码

这一步是等待升级的完成,准备的说是等待info命令的执行

if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
        //如果状态机是在 等待 状态
        if ((ri->flags & SRI_PROMOTED) &&
            (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
        {
            //执行切换
            ri->master->config_epoch = ri->master->failover_epoch;
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
            ri->master->failover_state_change_time = mstime();
            sentinelFlushConfig();
            sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
            if (sentinel.simfailure_flags &
                SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
                sentinelSimFailureCrash();
            sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
                ri->master,"%@");
            sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
                "start",ri->master->addr,ri->addr);
            sentinelForceHelloUpdateForMaster(ri->master);
        } else {
复制代码

当info回调用判断到主从的切换,并且状态机是在等待状态,就会执行主从切换并且转移到下一个状态

  • SENTINEL_FAILOVER_STATE_RECONF_SLAVES

最终对所有slave节点发送slaveof命令来进行主从切换,这个阶段成功后故障转移流程执行完毕

总结

故障转移流程看起来很复杂,尤其是info命令的回调函数,里面包含了很多情况的处理方法;每当master出现问题的时候,sentinel会进行投票选中一个sentinel成为leader来负责选主操作,会在健康的slave中选择一个成为主节点。整个选举过程利用了raft的leader选举方法来进行选举,通过超时来中断选举,通过状态机可以清楚的看出来选举过程中流程转变

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享