前言
本文介绍redis sentinel机制的原理和实现方式,redis可以配置主从模式来进行复制,但是一旦主服务器出问题了需要手工切换服务器,所以主从模式下并没有实现高可用,在切换的过程中还是会出现短暂的不可用。redis使用了sentinel(哨兵)来监控主从节点,当监视到主服务器不可用时会触发故障转移,从而自动切换服务器实现自动化。
正文
在配置了主从模式下,可以配置多个Sentinel节点来对主从节点进行监控
- 监控:Sentinel会定期发送命令给其他节点,以确保节点是否存活
- 故障转移:当多个Sentinel认为主节点下线的时候,会选出一个Sentinel节点来作为鼓掌转移节点,会选出一个从节点来作为主节点
- 通知:在选出主节点后会将结果进行通知
配置和初始化
sentinel monitor <master-name> <ip> <port> <count>
复制代码
表示当前需要监控的节点信息,count表示超过多少个sentinel认为该节点下线了,则视为下线,推荐sentinel数量的二分之一+1
sentinel down-after-millseconds <master-name> 100000
复制代码
设置对应机器的超时时间,超过这个时间则认为机器下线
sentinel failover-timeout mymaster 180000
复制代码
表示故障转移的时间,超过该时间认为故障转移失败
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;
if (!strcasecmp(argv[0],"monitor") && argc == 5) {
/* monitor <name> <host> <port> <quorum> */
//quorum表示多少个哨兵数量任务服务不可用 才进行failover
int quorum = atoi(argv[4]);
if (quorum <= 0) return "Quorum must be 1 or greater.";
//创建master实例
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
atoi(argv[3]),quorum,NULL) == NULL)
{
switch(errno) {
case EBUSY: return "Duplicated master name.";
case ENOENT: return "Can't resolve master instance hostname.";
case EINVAL: return "Invalid port number";
}
}
//down-after-milliseconds表示一个服务器多少毫秒内没有应答 就认为它下线
} else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
/* down-after-milliseconds <name> <milliseconds> */
//检查是否存在实例
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->down_after_period = atoi(argv[2]);
if (ri->down_after_period <= 0)
return "negative or zero time parameter.";
sentinelPropagateDownAfterPeriod(ri);
//故障切换的时间 如果超时则视为故障切换失败
} else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
/* failover-timeout <name> <milliseconds> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->failover_timeout = atoi(argv[2]);
if (ri->failover_timeout <= 0)
return "negative or zero time parameter.";
//指定多个Redis服务同步新的主机
} else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
/* parallel-syncs <name> <milliseconds> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->parallel_syncs = atoi(argv[2]);
//发生异常时调用的脚本
}
复制代码
sentinelHandleConfiguration代码中印证了上述配置,会调用createSentinelRedisInstance创建一个主服务器实例
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
char slavename[NET_PEER_ID_LEN], *sdsname;
serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
serverAssert((flags & SRI_MASTER) || master != NULL);
/* Check address validity. */
addr = createSentinelAddr(hostname,port);
if (addr == NULL) return NULL;
/* For slaves use ip:port as name. */
if (flags & SRI_SLAVE) {
anetFormatAddr(slavename, sizeof(slavename), hostname, port);
name = slavename;
}
if (flags & SRI_MASTER) table = sentinel.masters;
else if (flags & SRI_SLAVE) table = master->slaves;
else if (flags & SRI_SENTINEL) table = master->sentinels;
//判断设置的名称是否重复
sdsname = sdsnew(name);
if (dictFind(table,sdsname)) {
releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
}
复制代码
可以看到sentinel.masters用于存储主服务器实例,而主服务器实例里面有slaves和sentinels的字典来存储对应的从服务和哨兵服务
void initSentinel(void) {
unsigned int j;
//删除redis命令 添加哨兵命令
dictEmpty(server.commands,NULL);
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
if (populateCommandTableParseFlags(cmd,cmd->sflags) == C_ERR)
serverPanic("Unsupported command flag");
}
//初始化各种数据
sentinel.current_epoch = 0;
sentinel.masters = dictCreate(&instancesDictType,NULL);
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
sentinel.announce_ip = NULL;
sentinel.announce_port = 0;
sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
memset(sentinel.myid,0,sizeof(sentinel.myid));
}
复制代码
initSentinel用于初始化自己,current_epoch和raft里面的term比较像,表示当前任期。
原理
sentinel通过定时任务的方式来定期执行监控操作,由以下几点组成:
节点通信
- 每隔1S发送ping命令:ping命令用于连通性判断,会记录发送时间和接收时间
- 每隔2S发送hello命令:hello命令用于发送自己和记录的master信息,用于当服务器之间跟新master以及epoch信息
- 每隔10s发送info命令:info命令用于向对方请求获取节点信息,可以通过此方式发现slave服务器等
下线机制
- 主观下线:ping命令超时了,则主观认为下线了
- 客观下线:当主观认为下线了以后,则询问其他哨兵服务器是否下线
故障转移
当主服务客观下线后,可以向其他sentinel发送选主请求,来选择谁来当故障转移服务器,选主raft算法一样。当选主成功后,故障转移服务器会选择一个从服务器升级为主服务器
源码
void sentinelTimer(void) {
//检查上一次调用时间和当前间隔 是否进入被动模式
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();
复制代码
通过该定时任务执行,逐步分析
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
mstime_t delta = now - sentinel.previous_time;
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
}
sentinel.previous_time = mstime();
}
复制代码
sentinelCheckTiltCondition是判断上一次定时任务执行时间,如果超过2s表示要进入titl模式,因为一次的流程太长所以该服务器与其他服务器的配置会不一致,所以不应该信任他,titl模式无法进行主动命令只能被动接收
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
//获取哨兵实例
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
//故障转移 选出新的服务器
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
复制代码
sentinelHandleDictOfRedisInstances方法是遍历master服务器,并对它以及slave和sentinel执行sentinelHandleRedisInstance
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
//连接实例
sentinelReconnectInstance(ri);
//发送命令
sentinelSendPeriodicCommands(ri);
//解除titl模式
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
sentinelCheckSubjectivelyDown(ri);
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
}
/* Only masters */
if (ri->flags & SRI_MASTER) {
//判断master是否进入ODWON状态
sentinelCheckObjectivelyDown(ri);
//是否故障转移
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
复制代码
sentinelHandleRedisInstance流程
创建连接
首先执行sentinelReconnectInstance命令,如果还没有进行连接,对该实例进行异步连接以及PUB/SUB连接,该方法源码不放出来了,根据host和port连接
发送命令
sentinelSendPeriodicCommands发送INFO、PING、HELLO命令
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
//disconnected尽快返回
if (ri->link->disconnected) return;
//发送了很多命令但是没有返回,超过了100就不再发送命令 避免堆积
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
//info10秒执行一次 但是如果主节点down或者发送故障转移 就要1s执行一次 加快对服务器的变动信息获取
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
//ping1s执行一次,如果比down_after_period(超时下线时间)小,则为down_after_period。
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
//只向主节点和从节点发送info命令
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}
//发送ping
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}
//发送hello 2s执行一次 用于发现其他哨兵
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}
复制代码
流程如下:
- 命令只能堆积100次,如果发送了100次都没有回复则不发送
- INFO命令只向Master和Slave发送,本身10S执行一次,但是如果主节点下线或者在执行故障转移,则改为1S一次来加快信息的获取
- ping命令1s执行一次
- hello命令2s执行一次
ping命令
int sentinelSendPing(sentinelRedisInstance *ri) {
int retval = redisAsyncCommand(ri->link->cc,
sentinelPingReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"PING"));
if (retval == C_OK) {
ri->link->pending_commands++;
ri->link->last_ping_time = mstime();
/* We update the active ping time only if we received the pong for
* the previous ping, otherwise we are technically waiting since the
* first ping that did not receive a reply. */
if (ri->link->act_ping_time == 0)
ri->link->act_ping_time = ri->link->last_ping_time;
return 1;
} else {
return 0;
}
}
复制代码
发送ping命令代码,只跟新last_ping_time,对方收到ping命令后会进行回复,接收方法太简单不用看了,看sentinelPingReplyCallback方法
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;
if (!reply || !link) return;
link->pending_commands--;
r = reply;
if (r->type == REDIS_REPLY_STATUS ||
r->type == REDIS_REPLY_ERROR) {
if (strncmp(r->str,"PONG",4) == 0 ||
strncmp(r->str,"LOADING",7) == 0 ||
strncmp(r->str,"MASTERDOWN",10) == 0)
{
link->last_avail_time = mstime();
link->act_ping_time = 0;
} else {
//如果执行lua超时发送kill
if (strncmp(r->str,"BUSY",4) == 0 &&
(ri->flags & SRI_S_DOWN) &&
!(ri->flags & SRI_SCRIPT_KILL_SENT))
{
if (redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri,
"%s KILL",
sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
{
ri->link->pending_commands++;
}
ri->flags |= SRI_SCRIPT_KILL_SENT;
}
}
}
//ping命令成功跟新last_avail_time last_pong_time
link->last_pong_time = mstime();
}
复制代码
只要收到了命令就会更新last_pong_time,但是只会在有效回复更新last_avail_time来表示
hello命令
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," //自己信息
"%s,%s,%d,%llu", //当前master信息
announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"PUBLISH"),
SENTINEL_HELLO_CHANNEL,payload);
if (retval != C_OK) return C_ERR;
ri->link->pending_commands++;
return C_OK;
复制代码
hello命令的部分代码,hello会发送自己以及认为的leader的:ip、port、myid、master_name、current_epoch和master的current_epoch
void sentinelPublishCommand(client *c) {
if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
return;
}
sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
addReplyLongLong(c,1);
}
复制代码
hello命令的接受命令,看sentinelProcessHelloMessage
void sentinelProcessHelloMessage(char *hello, int hello_len) {
/* 命令组成格式
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
* 5=master_ip,6=master_port,7=master_config_epoch. */
int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
sentinelRedisInstance *si, *master;
if (numtokens == 8) {
//获取master主机信息
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup;
port = atoi(token[1]);
master_port = atoi(token[6]);
//获取发来的哨兵信息
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels,token[0],port,token[2]);
current_epoch = strtoull(token[3],NULL,10);
master_config_epoch = strtoull(token[7],NULL,10);
if (!si) {
//尝试删除master保存下的哨兵
removed = removeMatchingSentinelFromMaster(master,token[2]);
if (removed) {
sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
"%@ ip %s port %d for %s", token[0],port,token[2]);
} else {
//检查是否有相同地址的哨兵 如果有需要将地址标记为0 无效
sentinelRedisInstance *other =
getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0],port,NULL);
if (other) {
sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
other->addr->port = 0;
//将所有与该哨兵有关联的master取消连接 确保后续跟新一致性
sentinelUpdateSentinelAddressInAllMasters(other);
}
}
//创建哨兵 然后修改
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);
if (si) {
if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
si->runid = sdsnew(token[2]);
sentinelTryConnectionSharing(si);
if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
sentinelFlushConfig();
}
}
//跟新current_epoch
if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
//跟新master信息
if (si && master->config_epoch < master_config_epoch) {
master->config_epoch = master_config_epoch;
if (master_port != master->addr->port ||
strcmp(master->addr->ip, token[5]))
{
sentinelAddr *old_addr;
sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
sentinelEvent(LL_WARNING,"+switch-master",
master,"%s %s %d %s %d",
master->name,
master->addr->ip, master->addr->port,
token[5], master_port);
old_addr = dupSentinelAddr(master->addr);
sentinelResetMasterAndChangeAddress(master, token[5], master_port);
sentinelCallClientReconfScript(master,
SENTINEL_OBSERVER,"start",
old_addr,master->addr);
releaseSentinelAddr(old_addr);
}
}
/* Update the state of the Sentinel. */
if (si) si->last_hello_time = mstime();
}
cleanup:
sdsfreesplitres(token,numtokens);
}
复制代码
流程如下:
- 尝试根据发送的信息获取master的实例以及sentinel实例
- 如果当前master下没有sentinel实例,需要尝试删除或删除相同地址的sentinel,因为需要跟新sentinel,确保一致性
- 和raft一样,如果对方的current_epoch比自己大,更新current_epoch
- 如果对方发来的master_epoch比自己保存的大,说明master更换了,需要跟新信息
++hello命令的callbak代码只涉及到时间的跟新。这里不放出来了++
info命令
if (defsections || allsections || !strcasecmp(section,"sentinel")) {
dictIterator *di;
dictEntry *de;
int master_id = 0;
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Sentinel\r\n"
"sentinel_masters:%lu\r\n"
"sentinel_tilt:%d\r\n"
"sentinel_running_scripts:%d\r\n"
"sentinel_scripts_queue_length:%ld\r\n"
"sentinel_simulate_failure_flags:%lu\r\n",
dictSize(sentinel.masters),
sentinel.tilt,
sentinel.running_scripts,
listLength(sentinel.scripts_queue),
sentinel.simfailure_flags);
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
char *status = "ok";
if (ri->flags & SRI_O_DOWN) status = "odown";
else if (ri->flags & SRI_S_DOWN) status = "sdown";
info = sdscatprintf(info,
"master%d:name=%s,status=%s,address=%s:%d,"
"slaves=%lu,sentinels=%lu\r\n",
master_id++, ri->name, status,
ri->addr->ip, ri->addr->port,
dictSize(ri->slaves),
dictSize(ri->sentinels)+1);
}
dictReleaseIterator(di);
}
addReplyBulkSds(c, info);
}
复制代码
info命令的部分接受代码,收到了info命令后,需要向对方发送以上信息
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;
if (!reply || !link) return;
link->pending_commands--;
r = reply;
if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
}
复制代码
info命令的Callbacl方法,sentinelRefreshInstanceInfo代码很长,其实干了以下几件事:
- 如果是主节点
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
复制代码
提取从节点的host和port,如果没有则创建从节点,通过info来获取从节点
- 如果info是从节点:则更新从节点信息
- 当前记录的信息有可能和获取的不一样,如果当前记录是主节点,但是返回的是从节点,那么就需要执行slaveOf
- 如果返回的从节点,但是从节点记录的主节点和我们记录的主节点不一致,需要执行slaveOf
- 如果返回节点变成了主节点,触发转移流程
主观下线检查
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;
//检查是否重新连接
//1) 当前有连接 但是超过了SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且超过了一半的down_after_period
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 &&
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
instanceLinkCloseConnection(ri->link,ri->link->cc);
}
//2)检查pubsub连接 超过SENTINEL_PUBLISH_PERIOD*3
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
instanceLinkCloseConnection(ri->link,ri->link->pc);
}
//设置或取消SDOWN
//1)超过时间没有回复命令
//2)当前哨兵认为该实例是主服务 但是服务器向哨兵报告它成为从服务 但是超时了还没有进行角色转换
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
/* Is subjectively down */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* Is subjectively up */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}
复制代码
主观下线有两个判断依据:
- 超过时间没有回复命令
- 当前哨兵认为该实例是主服务 但是服务器向哨兵报告它成为从服务 但是超时了还没有进行角色转换
其中一个判断成功,则设置SDOWN
客观下线
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
if (master->flags & SRI_S_DOWN) {
quorum = 1;
//遍历哨兵
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
if (quorum >= master->quorum) odown = 1;
}
/* Set the flag accordingly to the outcome. */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
复制代码
这里遍历每台sentinels的flags是否有SRI_MASTER_DOWN数量,超过一定数量则会被判断客观下线
是否进行故障转移
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
//进入ODWON才能故障转移
if (!(master->flags & SRI_O_DOWN)) return 0;
//正在进行中
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
//上一次故障转移时间太久了 记录log
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000;
char ctimebuf[26];
ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
}
sentinelStartFailover(master);
return 1;
}
复制代码
只有客观下线才能进入故障转移,而判断客观下线是在sentinelAskMasterStateToOtherSentinels中
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
复制代码
这里是首先判断是否需要故障转移,然后执行sentinelAskMasterStateToOtherSentinels强制故障转移,随后还会执行sentinelAskMasterStateToOtherSentinels,所以判断是否客观在这个方法里面,我们顺着刚才的方法往下走
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
//自增current_epoch
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}
复制代码
开始故障转移,设置flags以及增加current_epoch
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;
//其他哨兵的状态太久 不采纳
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}
//主观判断down
if ((master->flags & SRI_S_DOWN) == 0) continue;
//断开了
if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
//发送命令 sentinel is-master-down-by-addr <ip> <port> <current_epoch> <runid>
ll2string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
复制代码
这里遍历了每一个sentinel,然后发送命令
sentinel is-master-down-by-addr <ip> <port> <current_epoch> <runid>
复制代码
该命令会循环其他sentinel主节点是否下线,并且会携带一个runid,这runid可以为*和自己的id,当为自己的runid时表示需要其他sentinel给自己投票,选出一个负责故障转移的机器来协调重新选主
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr,port,NULL);
//是否下线
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;
//如果指定了runid 投票
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}
/* Reply with a three-elements multi-bulk reply:
* down state, leader, vote epoch. */
addReplyArrayLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);
复制代码
这是is-master-down-by-addr的接收方法,如果当前sentinel认为主节点主观下线那么会通知;如果设置了runid,会调用sentinelVoteLeader进行投票
++投票的逻辑很简单,就是判断epoch。如果sentinel发送过来的请求的epoch比较大就投给它,和raft选举领导一样++
//跟新时间 并且设置该sentinel认为下线
ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) {
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
//如果是一个投票 获取投票结果 打印日志并且设置leader
if (strcmp(r->element[1]->str,"*")) {
/* If the runid in the reply is not "*" the Sentinel actually
* replied with a vote. */
sdsfree(ri->leader);
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
复制代码
投票的回调方法
执行故障转移
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
//等待开始
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
//选主
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
//升级
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
//等待升级
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
//发送slave命令
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
复制代码
故障转移状态机,下图看状态是在什么时候转移的
从客观下线修改状态开始,前面四个状态都是单向流动的,直到最后一个状态是在info命令的回调改变的,看一下状态机流程:
- SENTINEL_FAILOVER_STATE_WAIT_START
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;
/* Check if we are the leader for the failover epoch. */
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);
//如果投票结果不是自己 并且不是强制故障转移
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT;
//选举超时
if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;
//中断选举
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
//修改状态
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}
复制代码
通过sentinelGetLeader去统计其他票数,如果是自己就开始下一个状态
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
}
dictReleaseIterator(di);
复制代码
统计票数逻辑
- SENTINEL_FAILOVER_STATE_SELECT_SLAVE
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
复制代码
通过sentinelSelectSlave方法选择一个从节点升级为主节点
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
//不能是下线 失去连接 不活跃 而且优先级为0的
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
if (slave->slave_priority == 0) continue;
//如果主节点下线了 info回复时间应该减少 为了快速获取结果
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;
//处理超时
if (mstime() - slave->info_refresh > info_validity_time) continue;
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}
复制代码
从slave中选择优先级高、活跃的节点来作为新的主节点,选中节点后修改状态执行下一步
- SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;
//超时中断
if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}
复制代码
执行sentinelSendSlaveOf来升级被选中的从节点为主节点,主要执行slave of命令,并且修改状态进行下一步
- SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}
复制代码
这一步是等待升级的完成,准备的说是等待info命令的执行
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
//如果状态机是在 等待 状态
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
//执行切换
ri->master->config_epoch = ri->master->failover_epoch;
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
sentinelForceHelloUpdateForMaster(ri->master);
} else {
复制代码
当info回调用判断到主从的切换,并且状态机是在等待状态,就会执行主从切换并且转移到下一个状态
- SENTINEL_FAILOVER_STATE_RECONF_SLAVES
最终对所有slave节点发送slaveof命令来进行主从切换,这个阶段成功后故障转移流程执行完毕
总结
故障转移流程看起来很复杂,尤其是info命令的回调函数,里面包含了很多情况的处理方法;每当master出现问题的时候,sentinel会进行投票选中一个sentinel成为leader来负责选主操作,会在健康的slave中选择一个成为主节点。整个选举过程利用了raft的leader选举方法来进行选举,通过超时来中断选举,通过状态机可以清楚的看出来选举过程中流程转变