Redis6.0.6源码阅读——aof持久化

前言

Redis是基于内存的数据库,断电以后内存里面的数据会丢失,Redis支持AOF来进行数据的持久化,提高redis服务的容错性,本文将分析aof源码执行流程分析出aof实现逻辑和策略(但是不分析aof实际数据保存格式)

正文

aof执行逻辑.png

如图所示,以上是总结出来的aof执行流程,下面将从图中所涉及的方法分析具体源码。

命令传播

if (server.loading && c->flags & CLIENT_LUA)
        flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
    
    if (c->flags & CLIENT_LUA && server.lua_caller) {
        if (c->flags & CLIENT_FORCE_REPL)
            server.lua_caller->flags |= CLIENT_FORCE_REPL;
        if (c->flags & CLIENT_FORCE_AOF)
            server.lua_caller->flags |= CLIENT_FORCE_AOF;
    }

    //记录慢查询
    if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }


    if (flags & CMD_CALL_STATS) {
        /* use the real command that was executed (cmd and lastamc) may be
         * different, in case of MULTI-EXEC or re-written commands such as
         * EXPIRE, GEOADD, etc. */
        real_cmd->microseconds += duration;
        real_cmd->calls++;
    }

    /* Propagate the command into the AOF and replication link */
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;

        //检查命令是否更改
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        //强制aof或者主从
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

        //防止命令传播 因为有些命令执行过程中已传播
        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
                propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
                propagate_flags &= ~PROPAGATE_AOF;

        //传播命令
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }

    //恢复client标志
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    c->flags |= client_old_flags &
        (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);

复制代码

调用call方法执行完命令后,都会对flags进行判断,这么一大段代码,主要是为了执行这一句

propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
复制代码
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
复制代码

propagate会进行两个传播:aof传播和主从传播。如果开启了aof功能,那么每次执行完命令后都会调用feedAppendOnlyFile方法

feedAppendOnlyFile有三个阶段

  1. 切换db:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    //该命令作用的db和aof选择的db不同 重新设置
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }
复制代码

如果当前使用的db和上一次写入aof的不同,那么要切换db并且生成select命令

  1. 修改命令:
 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* 将EXPIRE/PEXPIRE/EXPIREAT 转换为 PEXPIREAT(毫秒) 并且设置绝对时间*/
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /*将 SETEX/PSETEX 改成SET和PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setCommand && argc > 3) {
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        for (i = 3; i < argc; i ++) {
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));

        if (exarg || pxarg) {
            /* 将SET [EX seconds][PX milliseconds] 改为SET和PEXPIREAT */
            buf = catAppendOnlyGenericCommand(buf,3,argv);
            if (exarg)
                buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                                   exarg);
            if (pxarg)
                buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                                   pxarg);
        } else {
            buf = catAppendOnlyGenericCommand(buf,argc,argv);
        }
    } else {
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }
复制代码

会处理一些SETEX等命令生成特定的格式,同时将命令按照aof需要的格式编码

  1. 写入buf或者rewrite
//写入aof_buf
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    //如果重写子进程正在运行调用
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
复制代码

最终会将命令追加到aof_buf中,如果重写命令的子进程正在运行,那么会添加到rewrite的buffer中

执行完命令后进行传播,这里并没有进行写盘,只是追加到了缓存中,同时也意味着如果刚好执行了命令,但是redis服务挂掉了,那么命令有可能不会被持久化,有丢失的风险。

命令重写

aof是通过不断追加命令来进行持久化的,当redis运行得越久,追加的命令会导致aof文件越来越大,这个时候就需要对aof进行重写。重写的原理是遍历数据库生成插入命令,同时在这个过程中还要保存主线程的新命令。

重写是一个重量级的操作,遍历db会阻塞主线程使得不能对外提供服务器,所以就放到子线程来进行。上面提到命令传播过程中如果重写命令子线程正在执行,会调用aofRewriteBufferAppend方法

void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        //向当前块添加数据
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            //如果当前块没有满 添加数据并且减少len
            if (thislen) {
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }

        //如果块添加满了 但是数据没有完全添加 创建新的块
        if (len) {
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            //每到10个块记录一次信息 100个块WARNING
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }

    //创建通道向子进程append数据
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}
复制代码

redis使用了aof_rewrite_buf_blocks的list来对重写命令进行缓存,每一个节点是10M大小,这里将命令传播的内容添加进blocks中,如果超出10M的量会新增节点。

最后会创建事件向子进程写入blocks的数据,调用aeCreateFileEvent来创建,该事件会调用aofChildWriteDiffData方法

void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        //aof停止了发送 或者没有块
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        //发送数据到重写子进程
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
            block->free += nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}
复制代码

aofChildWriteDiffData会循环处理aof_rewrite_buf_blocks的数据,使用管道向子进程写入数据


++redis是什么时候进行命令重写?++

if (server.aof_state == AOF_ON &&
            !hasActiveChildProcess() &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
复制代码

每次aof文件成长的百分比超过了设置的百分之,就要调用rewriteAppendOnlyFileBackground方法来进行命令重写

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;

    //判断是否有子进程
    if (hasActiveChildProcess()) return C_ERR;
    //创建父子进程通信管道
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    //fork进程
    if ((childpid = redisFork()) == 0) {
        char tmpfile[256];

        //设置
        redisSetProcTitle("redis-aof-rewrite");
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        //无法创建子进程
        if (childpid == -1) {
            closeChildInfoPipe();
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK;
}
复制代码
  • 判断是否有重写子进程,就有中断本次操作
  • 调用aofCreatePipes创建一系列管道来进行通信
  • fork子进程,创建临时文件调用rewriteAppendOnlyFile方法
int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;

复制代码

创建了6个管道,分别是父子进程互相读写管道,以及互相ack管道

int rewriteAppendOnlyFile(char *filename) {
    rio aof;
    FILE *fp;
    char tmpfile[256];
    char byte;

    //打开临时文件
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);

    //设置autoSync
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);

    startSaving(RDBFLAGS_AOF_PREAMBLE);

    //如果设置了aof使用rdb一样的格式
    if (server.aof_use_rdb_preamble) {
        int error;
        if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
    } else {
        //遍历dict写入文件
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }
复制代码

首先打开临时文件,然后将临时文件同rio关联。首先运行rewriteAppendOnlyFileRio方法,遍历所有的redisDB,将不同的数据格式化成命令形式写入文件

if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;

    //再次从父进程读取 如果超过1s或循环了20次结束
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        nodata = 0;
        aofReadDiffFromParent();
    }
复制代码

在有限的频率下调用aofReadDiffFromParent向父进程读取数据

ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; 
    ssize_t nread, total = 0;

    while ((nread =
            read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}
复制代码

读取的数据存在aof_child_diff中

 //让父进程不发数据
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    //读取父进程数据 进行确认 超时10s
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

复制代码

读取一次后向父进程发送命令,告诉父进程不要发送数据了,收到父进程的回应以后再次读取残留的数据,这个时候管道就空了

    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;


    //flush和关闭文件
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    //重命名文件
    if (rename(tmpfile,filename) == -1) {
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
    stopSaving(1);
    return C_OK;

复制代码

然后将aof_child_diff数据写入临时文件,将文件sync到磁盘,最后进行文件进行改名,新的aof就生成了。

通过上面两个流程可以看出来,命令传播过程中会将命令写入到buf,但是如果这个时候触发了命令的重写机制,重写子进程会顺带处理这些新增的命令进行写盘

rewrite收尾工作

当rewrite进程执行完毕后,还要对上面的操作进行收尾,因为blocks里面的数据还没有完全写入

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
        int newfd, oldfd;
        char tmpfile[256];
        long long now = ustime();
        mstime_t latency;

        serverLog(LL_NOTICE,
            "Background AOF rewrite terminated with success");

        latencyStartMonitor(latency);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            serverLog(LL_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }

        //将block数据写入到文件
        if (aofRewriteBufferWrite(newfd) == -1) {
            serverLog(LL_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

        serverLog(LL_NOTICE,
            "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
复制代码

会创建一个新的临时文件,调用aofRewriteBufferWrite写入数据

ssize_t aofRewriteBufferWrite(int fd) {
    listNode *ln;
    listIter li;
    ssize_t count = 0;

    listRewind(server.aof_rewrite_buf_blocks,&li);
    while((ln = listNext(&li))) {
        aofrwblock *block = listNodeValue(ln);
        ssize_t nwritten;

        if (block->used) {
            nwritten = write(fd,block->buf,block->used);
            if (nwritten != (ssize_t)block->used) {
                if (nwritten == 0) errno = EIO;
                return -1;
            }
            count += nwritten;
        }
    }
    return count;
}
复制代码

遍历aof_rewrite_buf_blocks节点,写入文件

//替换fd
            oldfd = server.aof_fd;
            server.aof_fd = newfd;
            //写盘
            if (server.aof_fsync == AOF_FSYNC_ALWAYS)
                redis_fsync(newfd);
            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
                aof_background_fsync(newfd);
            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
            aofUpdateCurrentSize();
            server.aof_rewrite_base_size = server.aof_current_size;
            server.aof_fsync_offset = server.aof_current_size;

            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
复制代码

随后就是对文件的进行替换、执行fsync、跟新size等。

文件同步

通过上面可以知道,命令被写入到了buf中,重写命令只是顺带处理buf中的内容,还会有其他方法来对buf缓存进行同步,那就是flushAppendOnlyFile方法

同步有三种模式:

  • AOF_FSYNC_ALWAYS:将aof_buf中写入文件并且同步

  • AOF_FSYNC_EVERYSEC:写入文件后,如果上次同步的时间距离现在超过1s,那么对aof文件进行同步,同

  • AOF_FSYNC_NO:将aof_buf中的所有内容写入到aof文件,但不对aof文件同步,同步有操作系统执行

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    //仅当aof_buf不为空的时候才进行
    if (sdslen(server.aof_buf) == 0) {
        //当使用超过1s aof信息不相等并且没有子进程运行的时候尝试fsync
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
            server.aof_fsync_offset != server.aof_current_size &&
            server.unixtime > server.aof_last_fsync &&
            !(sync_in_progress = aofFsyncInProgress())) {
            goto try_fsync;
        } else {
            return;
        }
    }
复制代码

flushAppendOnlyFile原则上是在aof_buf不为空的时候才进行的,但是如果设置了AOF_FSYNC_EVERYSEC,虽然buffer为空,但是还是有数据没有flush,所以调用try_fsync尝试一下

try_fsync:
    if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
        return;
    
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        latencyStartMonitor(latency);
        //当前线程进行
        redis_fsync(server.aof_fd); 
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_fsync_offset = server.aof_current_size;
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) {
            //另外线程处理
            aof_background_fsync(server.aof_fd);
            server.aof_fsync_offset = server.aof_current_size;
        }
        server.aof_last_fsync = server.unixtime;
    }
}
复制代码

if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = aofFsyncInProgress();

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        //已经有fsync任务进程正在运行
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                //重置推迟时间
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                //已经在等待fsync完成 但是时间还不足两秒 继续等待
                return;
            }
            //增加aof_delayed_fsync次数 并且提示
            server.aof_delayed_fsync++;
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
复制代码

如果另外的线程已经在进行fsync,所以要推迟本次方法调用,设置aof_flush_postponed_start记录时间然后退出方法

//aof_flush_sleep是debug里面设定了 为了在flush之前挂起一段时间
    if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
        usleep(server.aof_flush_sleep);
    }

    latencyStartMonitor(latency);
    //写入磁盘 没有flush
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    //记录各种时间
    if (sync_in_progress) {
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
    } else if (hasActiveChildProcess()) {
        latencyAddSampleIfNeeded("aof-write-active-child",latency);
    } else {
        latencyAddSampleIfNeeded("aof-write-alone",latency);
    }
    latencyAddSampleIfNeeded("aof-write",latency);

    //执行了写入 将推迟记为0
    server.aof_flush_postponed_start = 0;
复制代码

写入aof_buf到文件中,并且返回写入的长度

    //长度不一致表示有异常发生 后面记录异常
    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
        static time_t last_write_error_log = 0;
        int can_log = 0;

        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        if (nwritten == -1) {
            if (can_log) {
                serverLog(LL_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                serverLog(LL_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            //利用ftruncate回退刚才的写入
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    serverLog(LL_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            server.aof_last_write_status = C_ERR;

            //无法使用ftruncate撤回 只能确保已写入部分被记录
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return;
        }
    } else {
        //写入成功 打印日志 表示上一次失败了 但是这次成功了
        if (server.aof_last_write_status == C_ERR) {
            serverLog(LL_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = C_OK;
        }
    }
    server.aof_current_size += nwritten;

    //小于4000的时候清除 大于直接删除然后分配新的
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
复制代码

这段代码是判断写入的长度是否和aof_buf相等,如果不相等表示中间写入有错误,需要回退以及记录。

    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }
复制代码

serverCron里面有两个地方调用了flushAppendOnlyFile,是上一次flush错误以后才执行调用的

总结

以上就是redis的aof执行逻辑以及时机,flushAppendOnlyFile方法究竟是
什么时候调用的?这一点留在后面redis网络部分讲

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享