前言
Redis是基于内存的数据库,断电以后内存里面的数据会丢失,Redis支持AOF来进行数据的持久化,提高redis服务的容错性,本文将分析aof源码执行流程分析出aof实现逻辑和策略(但是不分析aof实际数据保存格式)
正文
如图所示,以上是总结出来的aof执行流程,下面将从图中所涉及的方法分析具体源码。
命令传播
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
}
//记录慢查询
if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
* EXPIRE, GEOADD, etc. */
real_cmd->microseconds += duration;
real_cmd->calls++;
}
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
//检查命令是否更改
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
//强制aof或者主从
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
//防止命令传播 因为有些命令执行过程中已传播
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
//传播命令
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
//恢复client标志
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
复制代码
调用call方法执行完命令后,都会对flags进行判断,这么一大段代码,主要是为了执行这一句
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
复制代码
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
复制代码
propagate会进行两个传播:aof传播和主从传播。如果开启了aof功能,那么每次执行完命令后都会调用feedAppendOnlyFile方法
feedAppendOnlyFile有三个阶段
- 切换db:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
//该命令作用的db和aof选择的db不同 重新设置
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
复制代码
如果当前使用的db和上一次写入aof的不同,那么要切换db并且生成select命令
- 修改命令:
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* 将EXPIRE/PEXPIRE/EXPIREAT 转换为 PEXPIREAT(毫秒) 并且设置绝对时间*/
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/*将 SETEX/PSETEX 改成SET和PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg || pxarg) {
/* 将SET [EX seconds][PX milliseconds] 改为SET和PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
复制代码
会处理一些SETEX等命令生成特定的格式,同时将命令按照aof需要的格式编码
- 写入buf或者rewrite
//写入aof_buf
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
//如果重写子进程正在运行调用
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
复制代码
最终会将命令追加到aof_buf中,如果重写命令的子进程正在运行,那么会添加到rewrite的buffer中
执行完命令后进行传播,这里并没有进行写盘,只是追加到了缓存中,同时也意味着如果刚好执行了命令,但是redis服务挂掉了,那么命令有可能不会被持久化,有丢失的风险。
命令重写
aof是通过不断追加命令来进行持久化的,当redis运行得越久,追加的命令会导致aof文件越来越大,这个时候就需要对aof进行重写。重写的原理是遍历数据库生成插入命令,同时在这个过程中还要保存主线程的新命令。
重写是一个重量级的操作,遍历db会阻塞主线程使得不能对外提供服务器,所以就放到子线程来进行。上面提到命令传播过程中如果重写命令子线程正在执行,会调用aofRewriteBufferAppend方法
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
//向当前块添加数据
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
//如果当前块没有满 添加数据并且减少len
if (thislen) {
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
//如果块添加满了 但是数据没有完全添加 创建新的块
if (len) {
int numblocks;
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
//每到10个块记录一次信息 100个块WARNING
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
//创建通道向子进程append数据
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
复制代码
redis使用了aof_rewrite_buf_blocks的list来对重写命令进行缓存,每一个节点是10M大小,这里将命令传播的内容添加进blocks中,如果超出10M的量会新增节点。
最后会创建事件向子进程写入blocks的数据,调用aeCreateFileEvent来创建,该事件会调用aofChildWriteDiffData方法
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
UNUSED(el);
UNUSED(fd);
UNUSED(privdata);
UNUSED(mask);
while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
//aof停止了发送 或者没有块
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
//发送数据到重写子进程
if (block->used > 0) {
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
block->free += nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
复制代码
aofChildWriteDiffData会循环处理aof_rewrite_buf_blocks的数据,使用管道向子进程写入数据
++redis是什么时候进行命令重写?++
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
复制代码
每次aof文件成长的百分比超过了设置的百分之,就要调用rewriteAppendOnlyFileBackground方法来进行命令重写
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
//判断是否有子进程
if (hasActiveChildProcess()) return C_ERR;
//创建父子进程通信管道
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
//fork进程
if ((childpid = redisFork()) == 0) {
char tmpfile[256];
//设置
redisSetProcTitle("redis-aof-rewrite");
redisSetCpuAffinity(server.aof_rewrite_cpulist);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
//无法创建子进程
if (childpid == -1) {
closeChildInfoPipe();
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
aofClosePipes();
return C_ERR;
}
serverLog(LL_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return C_OK;
}
return C_OK;
}
复制代码
- 判断是否有重写子进程,就有中断本次操作
- 调用aofCreatePipes创建一系列管道来进行通信
- fork子进程,创建临时文件调用rewriteAppendOnlyFile方法
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return C_OK;
复制代码
创建了6个管道,分别是父子进程互相读写管道,以及互相ack管道
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
char byte;
//打开临时文件
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
//设置autoSync
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
startSaving(RDBFLAGS_AOF_PREAMBLE);
//如果设置了aof使用rdb一样的格式
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
//遍历dict写入文件
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
复制代码
首先打开临时文件,然后将临时文件同rio关联。首先运行rewriteAppendOnlyFileRio方法,遍历所有的redisDB,将不同的数据格式化成命令形式写入文件
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
//再次从父进程读取 如果超过1s或循环了20次结束
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0;
aofReadDiffFromParent();
}
复制代码
在有限的频率下调用aofReadDiffFromParent向父进程读取数据
ssize_t aofReadDiffFromParent(void) {
char buf[65536];
ssize_t nread, total = 0;
while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}
复制代码
读取的数据存在aof_child_diff中
//让父进程不发数据
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
//读取父进程数据 进行确认 超时10s
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
复制代码
读取一次后向父进程发送命令,告诉父进程不要发送数据了,收到父进程的回应以后再次读取残留的数据,这个时候管道就空了
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
//flush和关闭文件
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
//重命名文件
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
return C_OK;
复制代码
然后将aof_child_diff数据写入临时文件,将文件sync到磁盘,最后进行文件进行改名,新的aof就生成了。
通过上面两个流程可以看出来,命令传播过程中会将命令写入到buf,但是如果这个时候触发了命令的重写机制,重写子进程会顺带处理这些新增的命令进行写盘
rewrite收尾工作
当rewrite进程执行完毕后,还要对上面的操作进行收尾,因为blocks里面的数据还没有完全写入
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
int newfd, oldfd;
char tmpfile[256];
long long now = ustime();
mstime_t latency;
serverLog(LL_NOTICE,
"Background AOF rewrite terminated with success");
latencyStartMonitor(latency);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
serverLog(LL_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
//将block数据写入到文件
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
serverLog(LL_NOTICE,
"Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
复制代码
会创建一个新的临时文件,调用aofRewriteBufferWrite写入数据
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
复制代码
遍历aof_rewrite_buf_blocks节点,写入文件
//替换fd
oldfd = server.aof_fd;
server.aof_fd = newfd;
//写盘
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
redis_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
aof_background_fsync(newfd);
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
复制代码
随后就是对文件的进行替换、执行fsync、跟新size等。
文件同步
通过上面可以知道,命令被写入到了buf中,重写命令只是顺带处理buf中的内容,还会有其他方法来对buf缓存进行同步,那就是flushAppendOnlyFile方法
同步有三种模式:
-
AOF_FSYNC_ALWAYS:将aof_buf中写入文件并且同步
-
AOF_FSYNC_EVERYSEC:写入文件后,如果上次同步的时间距离现在超过1s,那么对aof文件进行同步,同
-
AOF_FSYNC_NO:将aof_buf中的所有内容写入到aof文件,但不对aof文件同步,同步有操作系统执行
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
//仅当aof_buf不为空的时候才进行
if (sdslen(server.aof_buf) == 0) {
//当使用超过1s aof信息不相等并且没有子进程运行的时候尝试fsync
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}
复制代码
flushAppendOnlyFile原则上是在aof_buf不为空的时候才进行的,但是如果设置了AOF_FSYNC_EVERYSEC,虽然buffer为空,但是还是有数据没有flush,所以调用try_fsync尝试一下
try_fsync:
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
latencyStartMonitor(latency);
//当前线程进行
redis_fsync(server.aof_fd);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
//另外线程处理
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
复制代码
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
//已经有fsync任务进程正在运行
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
//重置推迟时间
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
//已经在等待fsync完成 但是时间还不足两秒 继续等待
return;
}
//增加aof_delayed_fsync次数 并且提示
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
复制代码
如果另外的线程已经在进行fsync,所以要推迟本次方法调用,设置aof_flush_postponed_start记录时间然后退出方法
//aof_flush_sleep是debug里面设定了 为了在flush之前挂起一段时间
if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}
latencyStartMonitor(latency);
//写入磁盘 没有flush
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
//记录各种时间
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (hasActiveChildProcess()) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
//执行了写入 将推迟记为0
server.aof_flush_postponed_start = 0;
复制代码
写入aof_buf到文件中,并且返回写入的长度
//长度不一致表示有异常发生 后面记录异常
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
//利用ftruncate回退刚才的写入
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
server.aof_last_write_status = C_ERR;
//无法使用ftruncate撤回 只能确保已写入部分被记录
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return;
}
} else {
//写入成功 打印日志 表示上一次失败了 但是这次成功了
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
//小于4000的时候清除 大于直接删除然后分配新的
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
复制代码
这段代码是判断写入的长度是否和aof_buf相等,如果不相等表示中间写入有错误,需要回退以及记录。
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
复制代码
serverCron里面有两个地方调用了flushAppendOnlyFile,是上一次flush错误以后才执行调用的
总结
以上就是redis的aof执行逻辑以及时机,flushAppendOnlyFile方法究竟是
什么时候调用的?这一点留在后面redis网络部分讲