在讲集群/副本/哨兵前,尽可能的将单节点功能先讲清楚。前面已经说了redis的启动流程,事件循环,aof/rdb,这次来讲讲redis是如何执行client的命令的。
在事件循环中,负责读取client的数据流用于执行命令,所以入口在那里。调用链路是这样的 readQueryFromClient->processInputBuffer->processCommandAndResetClient->processCommand
看看processCommand做了哪些事
int processCommand(client *c) {
// 在执行命令前一般都要通过一组过滤器 比如是否接受这样的命令等等 没有默认过滤器
moduleCallCommandFilters(c);
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc.
* 代表当前client会与redis断开连接 在network模块会检查该标记并断开连接
* */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth.
* 去命令池中寻找匹配的命令
* */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// command 未找到时 返回异常信息给client
if (!c->cmd) {
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
// 如果是参数数量不匹配的情况 将参数错误信息返回给client
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
// 获取本次执行的command类型标识 如果是批任务,并且本次执行的是exec并且这组批任务中至少有一个是write
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
// 代表当前command不应该在oom的情况下继续执行
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
// 该command是否允许在游离的节点上执行
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
// 当前节点数据加载还未完成 是否允许执行
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active.
* 判断当前用户是否已验权
* USER_FLAG_NOPASS 代表不需要密码 也就是不用验权
* USER_FLAG_DISABLED 代表当前用户不可用
* !c->authenticated 代表默认用户还未验权
* */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
(DefaultUser->flags & USER_FLAG_DISABLED)) &&
!c->authenticated;
// 代表此时还未进行验权
if (auth_required) {
/* AUTH and HELLO and no auth modules are valid even in
* non-authenticated state.
* 如果要执行的command本身并不需要验权 就不会拦截 比如验权命令本身 或者是hello命令
* */
if (!(c->cmd->flags & CMD_NO_AUTH)) {
rejectCommand(c,shared.noautherr);
return C_OK;
}
}
/* Check if the user can run this command according to the current
* ACLs. */
int acl_keypos;
// 当用户认证成功后 会更改client->user 此时对用户权限进行校验,因为当前用户不一定有权限执行这个command
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); // 1
// 权限校验失败认为是client参数错误 本次processCommand还是成功的
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
if (acl_retval == ACL_DENIED_CMD)
rejectCommandFormat(c,
"-NOPERM this user has no permissions to run "
"the '%s' command or its subcommand", c->cmd->name);
else
rejectCommandFormat(c,
"-NOPERM this user has no permissions to access "
"one of the keys used as arguments");
return C_OK;
}
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments.
* TODO 这里涉及到请求在集群内的转发 有关集群的逻辑之后看
* */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction.
* 代表redis服务器有设置内存使用上限
* */
if (server.maxmemory && !server.lua_timedout) {
// 这里判断此时是否有足够的内存 释放内存的方式就会涉及到内存淘汰策略 某些命令不支持在oom的情况下继续执行
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; // 2
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed.
* 在freeMemoryIfNeeded中 会将slave缓冲区中的数据尽可能的发送出去,是为了尽快的同步delCommand 避免slave节点内存不足,在这个过程中可能会间接导致与该client的连接断开(比如发生了什么错误)
* 在调用unlinkClient后就会将current_client置空 此时也就没有继续执行command的必要了
* */
if (server.current_client == NULL) return C_ERR;
// 判断当前command是否不允许在oom的情况下执行 比如get操作不会申请新内存 还是可以正常执行的
int reject_cmd_on_oom = is_denyoom_command;
/* If client is in MULTI/EXEC context, queuing may consume an unlimited
* amount of memory, so we want to stop that.
* However, we never want to reject DISCARD, or even EXEC (unless it
* contains denied commands, in which case is_denyoom_command is already
* set.
* 如果本次正在执行一组command 并且其中没有表示完结的exec/discard 那么之后可能会使用大量内存 以防万一就拒绝本次任务
* */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand) {
reject_cmd_on_oom = 1;
}
// 此时内存不足 且本次command无法在oom的情况下继续执行 将错误信息返回给client
if (out_of_memory && reject_cmd_on_oom) {
rejectCommand(c, shared.oomerr);
return C_OK;
}
}
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance.
* 检查最近的一次aof/rdb写入是否正常
* */
int deny_write_type = writeCommandsDeniedByDiskError();
// 此时aof/rdb无法正常工作 顺便提一下aof默认是关闭的,redis可以仅依靠满足saveparam触发rdb的生成来实现可用性(但不是高可用,数据可能会丢失)
// 并且本节点是master节点 也就意味着无法通过与master节点的数据同步实现数据恢复 并且本次是一次写入命令 或者是一次探测命令(pingCommand同时兼具了检测对端节点能否正常工作的任务)
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(is_write_command ||c->cmd->proc == pingCommand))
{
if (deny_write_type == DISK_ERROR_TYPE_RDB)
rejectCommand(c, shared.bgsaveerr);
else
rejectCommandFormat(c,
"-MISCONF Errors writing to the AOF file: %s",
strerror(server.aof_last_write_errno));
return C_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option.
* 当前可以正常工作的slave节点数量低于要求的最小值, 无法执行命令 对最小可用slave数量有所限制是为了保障高可用性
* TODO 有关集群/副本/分片的逻辑之后梳理
* */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master.
* repl_slave_ro 代表当前slave节点是一个只读节点,无法执行写入命令
* */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
return C_OK;
}
/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits.
* 如果该client是作为本节点的订阅者,只能执行以下命令
* */
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
c->cmd->name);
return C_OK;
}
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master.
* 当前slave节点处于游离状态时 是否允许执行命令 (游离状态就代表此时本节点的数据不是最新的 因为缺少了同步数据的目标节点)
* */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
// repl_serve_stale_data 默认为1,也就是允许在游离节点上执行命令 在生产环境下为保证一致性应该将该配置设置成0
server.repl_serve_stale_data == 0 &&
is_denystale_command)
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
}
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag.
* 此时还处于数据加载阶段 某些command就会被拒绝
* */
if (server.loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
/* Exec the command
* 在通过参数校验 command执行权限校验 内存校验 节点状态校验后 终于开始执行command
* 如果发现本次执行的是一个批任务 并且本次不是表示终止的任务 将command加入到队列中
* */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 这里传入的CMD_CALL_FULL包含了CMD_CALL_PROPAGATE_REPL 而CMD_CALL_PROPAGATE_REPL等同于CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL
// 也就是默认情况下需要将任务同步到aof和slave上
call(c,CMD_CALL_FULL); // 3
// TODO 副本相关
c->woff = server.master_repl_offset;
// 在执行完某些任务后 可能某些key会被加入到ready_keys (在执行dbAdd时就会将key设置到ready_keys中)
// 此时就可以唤醒阻塞等待这些key的client 什么情况会产生被阻塞的client?
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys(); // 4
}
return C_OK;
}
复制代码
以上省略了lua脚本以及trace相关的代码。有4个可以探入的地方。
1.有关权限验证
2.检测内存
3.真正执行命令
4.当key准备完成后,唤醒之前等待这些key而被阻塞的client。
首次执行redis命令前,需要先验证用户名密码。之后会设置client->user。验证的命令是authCommand。先不深究什么时候发出的命令。看一下校验做了哪些事。
int ACLCheckCommandPerm(client *c, int *keyidxptr) {
// 该client绑定的用户
user *u = c->user;
// 本次要执行的command
uint64_t id = c->cmd->id;
/* If there is no associated user, the connection can run anything.
* client未绑定用户 默认就是通过校验
* */
if (u == NULL) return ACL_OK;
/* Check if the user can execute this command.
* authCommand 本身是不需要权限校验的
* 而如果当前用户设置了可以执行所有权限的标记 也不需要校验
* */
if (!(u->flags & USER_FLAG_ALLCOMMANDS) &&
c->cmd->proc != authCommand)
{
/* If the bit is not set we have to check further, in case the
* command is allowed just with that specific subcommand.
* 每个用户有自己的权限位图 因为command本身数量比较多 所以会想到用位图来表示 每个用户有哪些权限是在配置文件中设置的,在启动redis时会读取配置文件。 TODO 这里不细看了 基本就是通过位图实现权限校验
* */
if (ACLGetUserCommandBit(u,id) == 0) {
/* Check if the subcommand matches.
* 这里是command没有匹配上的情况 尝试匹配subcommand
* 当参数比较少 或者用户没有设置subCommand的情况 就返回错误信息
* */
if (c->argc < 2 ||
u->allowed_subcommands == NULL ||
u->allowed_subcommands[id] == NULL)
{
// 子命令匹配失败 返回权限校验失败
return ACL_DENIED_CMD;
}
long subid = 0;
while (1) {
if (u->allowed_subcommands[id][subid] == NULL)
return ACL_DENIED_CMD;
// 这里尝试匹配subcommand 只要匹配成功 还是允许执行命令的
if (!strcasecmp(c->argv[1]->ptr,
u->allowed_subcommands[id][subid]))
break; /* Subcommand match found. Stop here. */
subid++;
}
}
}
/* Check if the user can execute commands explicitly touching the keys
* mentioned in the command arguments.
* 某些key可能在某些command下不允许被使用 这里是进行检验
* */
if (!(c->user->flags & USER_FLAG_ALLKEYS) &&
// 下面这2个参数是辅助校验key的
(c->cmd->getkeys_proc || c->cmd->firstkey))
{
// 获取一个空的getKeyResult
getKeysResult result = GETKEYS_RESULT_INIT;
// numkeys 代表本次要检查的key数量 具体要检查哪些key会存储在result中
int numkeys = getKeysFromCommand(c->cmd,c->argv,c->argc,&result);
// keyidx对应参数下标
int *keyidx = result.keys;
for (int j = 0; j < numkeys; j++) {
listIter li;
listNode *ln;
listRewind(u->patterns,&li);
/* Test this key against every pattern.
* 针对参数级别进行校验 要求匹配pattern
* */
int match = 0;
while((ln = listNext(&li))) {
sds pattern = listNodeValue(ln);
size_t plen = sdslen(pattern);
// 获取该key在db中的下标
int idx = keyidx[j];
if (stringmatchlen(pattern,plen,c->argv[idx]->ptr,
sdslen(c->argv[idx]->ptr),0))
{
match = 1;
break;
}
}
// 执行某些command时 某几个参数根据当前用户决定是否有执行权力
if (!match) {
if (keyidxptr) *keyidxptr = keyidx[j];
getKeysFreeResult(&result);
return ACL_DENIED_KEY;
}
}
getKeysFreeResult(&result);
}
/* If we survived all the above checks, the user can execute the
* command. */
return ACL_OK;
}
复制代码
getKeysFromCommand()展开来是获取本次执行command的参数信息,权限认证主要是检测当前用户是否有权限执行该command,subCommand,以及使用的参数是否能通过pattern。
对于非只读操作而言,需要检测内存是否足够,当内存不足时就会启动redis的内存淘汰策略。
int freeMemoryIfNeeded(void) {
int keys_freed = 0;
/* By default replicas should ignore maxmemory
* and just be masters exact copies.
* 当前节点是副本 且设置了不需要考虑slave的内存情况 也就是内存控制任务完全放在master节点 其他节点只要确保同步正常就可以确保内存在合理范围之内
* */
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
size_t mem_reported, mem_tofree, mem_freed;
mstime_t latency, eviction_latency, lazyfree_latency;
long long delta;
int slaves = listLength(server.slaves);
int result = C_ERR;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed.
* 如果此时client都被暂停 此时不会发生新的写入操作 就认为内存是够用的
* TODO 发现在集群模块中会将某个client暂停 先搁置
* */
if (clientsArePaused()) return C_OK;
// 如果此时mem_reported < server.maxMem 就代表有足够的空间
// mem_reported 是此时检测已经使用的内存
// mem_tofree 是要释放的内存
if (getMaxmemoryState(&mem_reported, NULL, &mem_tofree, NULL) == C_OK)
return C_OK;
// 记录已经释放的内存空间
mem_freed = 0;
// 此时已经确定要释放内存了
latencyStartMonitor(latency);
// 这里就涉及到内存淘汰策略了 如果执行了不允许进行内存淘汰 跳转到cant_free
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */
while (mem_freed < mem_tofree) {
int j, k, i;
// 这里要挨个遍历所有db 并尝试内存清理
static unsigned int next_db = 0;
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
// 当采用lru/lfu 或者 volatile_ttl的内存淘汰策略时
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
// 这个链表按照一个得分对所有需要淘汰的对象进行排序 每次只会淘汰那个得分最高的对象
struct evictionPoolEntry *pool = EvictionPoolLRU;
// 通过下面的算法决定哪个key被淘汰的优先级最高
while (bestkey == NULL) {
// total_keys 总计会扫描到多少个key
unsigned long total_keys = 0, keys;
/* We don't want to make local-db choices when expiring keys,
* so to start populate the eviction pool sampling keys from
* every DB.
* 扫描所有db之后(抽样) 得到评分最低的16个key
* */
for (i = 0; i < server.dbnum; i++) {
db = server.db + i;
// 根据此时的内存淘汰策略 选择仅从设置了超时时间的key中淘汰对象 还是db下全key范围淘汰对象
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
// 抽样计算得分并填充到evictionPool中
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
}
// 代表此时db下没有任何key/或者在expires中没有任何key 也就无法淘汰数据
if (!total_keys) break; /* No keys to evict. */
/* Go backward from best to worst element to evict.
* 从后往前看,因为在pool中idle得分越高的在越后面
* */
for (k = EVPOOL_SIZE - 1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;
// 回查之前存入的数据
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else {
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
pool[k].key = NULL;
pool[k].idle = 0;
/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}
/* volatile-random and allkeys-random policy
* 这里选择的是随机淘汰策略 上面的话扫描n个db后仅选出了一个key效率会不会太低?
* 这里随机选出一个key后就可以直接返回了
* */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) {
/* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db + j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
bestdbid = j;
break;
}
}
}
/* Finally remove the selected key.
* 代表本轮找到了合适的key
* */
if (bestkey) {
db = server.db + bestdbid;
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
// 因为该key将被强制淘汰 需要将信息同步到aof/副本上
propagateExpire(db, keyobj, server.lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* Same for CSC invalidation messages generated by signalModifiedKey.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
// 判断是否采用惰性释放的方式删除对象
// 无论是同步还是异步都会立即从db中删除这个key 但是内存的释放可能会延后(调用dbAsyncDelete不一定就会通过bio进行释放 还要看本次要释放的redisObject大小
// 只有对象比较大的时候 回收耗时长 才会通过bio回收任务)
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db, keyobj);
else
dbSyncDelete(db, keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del", eviction_latency);
delta -= (long long) zmalloc_used_memory();
// 计算此时已经被释放掉的内存 如果对象被惰性删除了 实际上这里内存还不会立即释放 还会继续抽样
mem_freed += delta;
server.stat_evictedkeys++;
signalModifiedKey(NULL, db, keyobj);
// 发出一个淘汰事件
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
keys_freed++;
/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop.
* 提前将缓冲区的数据发送出去 主要是同步副本的删除命令 避免副本没有足够的空间执行之后的命令
* */
if (slaves) flushSlavesOutputBuffers();
/* Normally our stop condition is the ability to release
* a fixed, pre-computed amount of memory. However when we
* are deleting objects in another thread, it's better to
* check, from time to time, if we already reached our target
* memory, since the "mem_freed" amount is computed only
* across the dbAsyncDelete() call, while the thread can
* release the memory all the time.
* 如果采用的是惰性删除,内存的释放是异步的,那么每隔16轮主动去重新检测内存是否足够
* */
if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL, NULL, NULL, NULL) == C_OK) {
/* Let's satisfy our stop condition. */
mem_freed = mem_tofree;
}
}
} else {
// 代表本轮没有找到任何可以释放的key
goto cant_free; /* nothing to free... */
}
}
result = C_OK;
cant_free:
/* We are here if we are not able to reclaim memory. There is only one
* last thing we can try: check if the lazyfree thread has jobs in queue
* and wait...
* 代表本次没有释放足够的空间
* */
if (result != C_OK) {
latencyStartMonitor(lazyfree_latency);
// 阻塞等待后台任务处理完所有的内存释放任务 如果中途有足够的内存了 就可以返回ok 如果内存还是不足 且没有释放内存的任务了 本次就没有足够的内存空间
while (bioPendingJobsOfType(BIO_LAZY_FREE)) {
if (getMaxmemoryState(NULL, NULL, NULL, NULL) == C_OK) {
result = C_OK;
break;
}
// 这是沉睡1000毫秒
usleep(1000);
}
latencyEndMonitor(lazyfree_latency);
latencyAddSampleIfNeeded("eviction-lazyfree", lazyfree_latency);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle", latency);
return result;
}
复制代码
getMaxmemoryState()是检测当前内存使用状况的
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) {
size_t mem_reported, mem_used, mem_tofree;
/* Check if we are over the memory usage limit. If we are not, no need
* to subtract the slaves output buffers. We can just return ASAP.
* 获取此时使用的总内存
* */
mem_reported = zmalloc_used_memory();
if (total) *total = mem_reported;
/* We may return ASAP if there is no need to compute the level.
* 如果没有设置内存上限 或者此时占用内存没有达到上线 就可以将此时获取到的情况返回
* */
int return_ok_asap = !server.maxmemory || mem_reported <= server.maxmemory;
if (return_ok_asap && !level) return C_OK;
/* Remove the size of slaves output buffers and AOF buffer from the
* count of used memory. */
mem_used = mem_reported;
// 这里可以将aof和slave占用的内存忽略
size_t overhead = freeMemoryGetNotCountedMemory();
mem_used = (mem_used > overhead) ? mem_used - overhead : 0;
/* Compute the ratio of memory usage.
* TODO 先忽略level
* */
if (level) {
if (!server.maxmemory) {
*level = 0;
} else {
*level = (float) mem_used / (float) server.maxmemory;
}
}
if (return_ok_asap) return C_OK;
/* Check if we are still over the memory limit.
* 代表还有空间可用
* */
if (mem_used <= server.maxmemory) return C_OK;
/* Compute how much memory we need to free.
* 计算需要释放多少内存
* */
mem_tofree = mem_used - server.maxmemory;
if (logical) *logical = mem_used;
if (tofree) *tofree = mem_tofree;
return C_ERR;
}
复制代码
evictionPoolPopulate()从db中抽样获取redisObject,并计算它的得分,得分越高淘汰的效果越好,将得分高的一组对象暂存到一个pool中,每次循环仅淘汰一个。
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
// 根据最大的抽样数量 分配对应的内存空间
dictEntry *samples[server.maxmemory_samples];
// 这里随机抽取了一些key 并填充到samples中
count = dictGetSomeKeys(sampledict, samples, server.maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
sds key;
robj *o;
dictEntry *de;
// 获取该slot下随机采用的key
de = samples[j];
key = dictGetKey(de);
/* If the dictionary we are sampling from is not the main
* dictionary (but the expires one) we need to lookup the key
* again in the key dictionary to obtain the value object.
* 如果淘汰策略允许淘汰未过时的key 切换dict容器 是这样,基于超时时间存储的dict与普通dict的value不同 一个是long 一个是redisObject
* 基于lru、lfu算法进行淘汰时都是使用redisObject的信息
* */
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
// 从db.dict中查找 key,redisObject
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
}
/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
* just a score where an higher score means better candidate.
* lru就是清理最久没有使用的key 这里获取该对象的idle时间
* */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = estimateObjectIdleTime(o);
// 如果本次基于lfu算法 就是看某个对象在一段时间内的使用次数 这些因素会计算出一个数值 根据该数值来判断key的优先级
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
/* When we use an LRU policy, we sort the keys by idle time
* so that we expire keys starting from greater idle time.
* However when the policy is an LFU one, we have a frequency
* estimation, and we want to evict keys with lower frequency
* first. So inside the pool we put objects using the inverted
* frequency subtracting the actual frequency to the maximum
* frequency of 255.
* 这时idle是一个类似分数的概念 计算公式比较复杂 这里就不展开了
* */
idle = 255 - LFUDecrAndReturn(o);
// 根据该对象的过期时间计算idle
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - (long) dictGetVal(de);
} else {
// 忽略其余淘汰策略
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
// 上面计算出了当前遍历到的样本的idle时间 idle越大代表淘汰该对象收益越高
/* Insert the element inside the pool.
* First, find the first empty bucket or the first populated
* bucket that has an idle time smaller than our idle time.
*
* */
k = 0;
// 在server启动后 pool被初始化时内部每个元素的被提前填充 idle为0 且key默认为null
// pool中的元素 idle是逐渐增大的
// 这里是在定位到一个合适的下标 那么淘汰时也是选择淘汰最后一个(idle最大 收益最高)
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle)
k++;
// 代表此时idle小于pool中所有元素的idle 并且此时pool是满的 (移除的时候是从后往前移除 插入是从前往后)
if (k == 0 && pool[EVPOOL_SIZE - 1].key != NULL) {
/* Can't insert if the element is < the worst element we have
* and there are no empty buckets. */
continue;
// 代表本次元素会填充到某个slot中
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* Inserting into empty position. No setup needed before insert. */
} else {
// 代表需要替换掉某个元素
/* Inserting in the middle. Now k points to the first element
* greater than the element to insert.
* 如果此时最后一个元素为空 必然会发生数据的迁移 也就是从插入的位置开始 将后面的数据全部往后移动一位
* */
if (pool[EVPOOL_SIZE - 1].key == NULL) {
/* Free space on the right? Insert at k shifting
* all the elements from k to end to the right. */
/* Save SDS before overwriting. */
sds cached = pool[EVPOOL_SIZE - 1].cached;
memmove(pool + k + 1, pool + k,
sizeof(pool[0]) * (EVPOOL_SIZE - k - 1));
pool[k].cached = cached;
} else {
/* No free space on right? Insert at k-1 */
// 这种情况代表pool已经满了 就会将插入位置之前的数据前移
k--;
/* Shift all elements on the left of k (included) to the
* left, so we discard the element with smaller idle time.*/
sds cached = pool[0].cached; /* Save SDS before overwriting. */
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool, pool + 1, sizeof(pool[0]) * k);
pool[k].cached = cached;
}
}
/* Try to reuse the cached SDS string allocated in the pool entry,
* because allocating and deallocating this object is costly
* (according to the profiler, not my fantasy. Remember:
* premature optimization bla bla bla.
* 由于缓存框架本身的特性 得分高的redisObject对应的key可能总是那几个 所以对key做了缓存
* 这里有关pool[k].cached的作用看的不是很明白,但是对淘汰排名逻辑本身没有影响。
* */
int klen = sdslen(key);
// 当key的长度没有超过最大缓存长度时 才会复用sds 否则会生成一个新的sds
if (klen > EVPOOL_CACHED_SDS_SIZE) {
pool[k].key = sdsdup(key);
} else {
// 将key拷贝到cache中
memcpy(pool[k].cached, key, klen + 1);
sdssetlen(pool[k].cached, klen);
pool[k].key = pool[k].cached;
}
// 将会被淘汰的entry会设置到pool中
pool[k].idle = idle;
pool[k].dbid = dbid;
}
}
复制代码
之后call()是真正执行命令,符合条件的command还会写入到aof以及同步到副本上。
void call(client *c, int flags) {
long long dirty;
ustime_t start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
server.fixed_time_expire++;
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation.
* 去掉这些标记 避免影响之后的判断
* */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 每当执行某个新的command时 重置also_propagate 这是不同于事务的另一套体系
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);
/* Call the command.
* 先记录此时脏数据数量
* */
dirty = server.dirty;
updateCachedTime(0);
start = server.ustime;
// 执行command 此时可能会发起对某些redisObject的操作 同时可能会将某些信息写入到client的缓冲区
c->cmd->proc(c);
duration = ustime()-start;
// 代表有多少个redisObject受打影响
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag.
* 如果此时发现了CLIENT_CLOSE_AFTER_COMMAND 标记 需要更新成CLIENT_CLOSE_AFTER_REPLY
* */
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
// TODO 忽略统计项
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
* EXPIRE, GEOADD, etc. */
real_cmd->microseconds += duration;
real_cmd->calls++;
}
/* Propagate the command into the AOF and replication link
* 原本要求同步到aof以及rep上 而执行command后 并没有需要防止所有的传播 换句话就是需要传播
* */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
// 是否会传播需要通过下面的逻辑判断 比如执行multi实际上不会触发传播 而在执行exec命令时会将multi包括内部所有子命令一起传播 (前提是内部所有command中只要有一条满足传播条件,也就是非只读,非admin)
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation.
* 代表至少有某个数据发生了变化 这时就要将command同步到aof/副本上
* */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set.
* 可能数据没有变化 根据执行的command判断需要同步的是哪种类型
* */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementation called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so.
* 目前仅在set/stream中看到设置保护模式 先忽略
* 可能某些情况下此时不适合同步到aof或者是副本上 这里情况就不展开了
* */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically.
* 将本次操作写入到aof以及同步到其他副本
* */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
/* Restore the old replication flags, since call() can be executed
* recursively. 这2步是恢复调用前的flags */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 以上已经执行完command 并且同时写入到了aof和副本中 (对于aof并不会同步刷盘,写入效率高,但是存在数据丢失问题,接下来就要看如何解决这个问题,副本的写入是异步的)
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag.
* 这些是module相关的 module可以理解为一种插件系统 这里的逻辑先忽略 (在redistSet中有看到相关的逻辑)
* */
if (server.also_propagate.numops) {
int j;
redisOp *rop;
if (flags & CMD_CALL_PROPAGATE) {
int multi_emitted = 0;
/* Wrap the commands in server.also_propagate array,
* but don't wrap it if we are already in MULTI context,
* in case the nested MULTI/EXEC.
*
* And if the array contains only one command, no need to
* wrap it, since the single command is atomic.
* 这里可以简单理解成传播了一个事务命令
* */
if (server.also_propagate.numops > 1 &&
!(c->cmd->flags & CMD_MODULE) &&
!(c->flags & CLIENT_MULTI) &&
!(flags & CMD_CALL_NOWRAP))
{
execCommandPropagateMulti(c);
multi_emitted = 1;
}
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
if (multi_emitted) {
execCommandPropagateExec(c);
}
}
redisOpArrayFree(&server.also_propagate);
}
// 还原数据 看来会存在嵌套的场景
server.also_propagate = prev_also_propagate;
server.fixed_time_expire--;
server.stat_numcommands++;
/* Record peak memory after each command and before the eviction that runs
* before the next command. */
size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used;
}
复制代码
以上代码忽略了监控模块,lua脚本。可以看到核心逻辑就是执行了一个函数,c->cmd->proc(c)。对于redisObject操作就是通过db,key 定位到redisObject(在api层对应redis的set,list,stream,zset,string。而在底层则对应数据结构ziplist,quicklist,intset,dict…,这些数据结构就不单独讲了,了。相对比较复杂的就是ziplist,zskiplist。),并执行相关函数进行增删改查操作。最后这些操作可能会产生某些key,比如插入了一个新的redisObject,而某个client正好处于阻塞状态,并等待这样的key插入。就可以解除它的阻塞状态。并执行之前被阻塞的command。对应handleClientsBlockedOnKeys()
void handleClientsBlockedOnKeys(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
/* Point server.ready_keys to a fresh list and save the current one
* locally. This way as we run the old list we are free to call
* signalKeyAsReady() that may push new elements in server.ready_keys
* when handling clients blocked into BRPOPLPUSH. */
l = server.ready_keys;
server.ready_keys = listCreate();
while(listLength(l) != 0) {
listNode *ln = listFirst(l);
readyList *rl = ln->value;
/* First of all remove this key from db->ready_keys so that
* we can safely call signalKeyAsReady() against this key.
* 将本轮要处理的key从ready_key中移除
* */
dictDelete(rl->db->ready_keys,rl->key);
/* Even if we are not inside call(), increment the call depth
* in order to make sure that keys are expired against a fixed
* reference time, and not against the wallclock time. This
* way we can lookup an object multiple times (BRPOPLPUSH does
* that) without the risk of it being freed in the second
* lookup, invalidating the first one.
* See https://github.com/antirez/redis/pull/6554. */
server.fixed_time_expire++;
updateCachedTime(0);
/* Serve clients blocked on the key.
* 找到此时正在阻塞的那个redisObject
* */
robj *o = lookupKeyWrite(rl->db,rl->key);
// 按照robj的类型走不同的处理逻辑
if (o != NULL) {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
else if (o->type == OBJ_ZSET)
// 阻塞等待的是zset类型的robj
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
// TODO 先忽略stream类型
serveClientsBlockedOnStreamKey(o,rl);
/* We want to serve clients blocked on module keys
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
}
server.fixed_time_expire--;
/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}
复制代码
只看list的逻辑,其他的类似
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked on a list key, and there may be new
* data to fetch (the key is ready).
* @param readyList 这里存储的是server下所有已经准备好的key 本次调用就是检查在blocking_keys中是否有匹配的key 并解除阻塞(之前阻塞的这个key此时已经准备好了)
* */
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last.
* */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
// 代表在对应的db下 该key也存在于blocking_keys
if (de) {
list *clients = dictGetVal(de);
// 代表有多少client被阻塞在该key下
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
// 本次client的阻塞类型不是list类型 将该节点移动到链表尾部
if (receiver->btype != BLOCKED_LIST) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listRotateHeadToTail(clients);
continue;
}
// 本次阻塞的对象是一个list 此时该对象已经准备完成后 查看之前被阻塞的client想要执行的command是什么 并在此时重新执行
// 如果dstKey不为空
robj *dstkey = receiver->bpop.target;
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
LIST_HEAD : LIST_TAIL;
robj *value = listTypePop(o,where);
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
// 此时client 已经等到了数据 可以从阻塞状态解除了 在这里会将client从blocking_keys对应的robjList移除
unblockClient(receiver);
// 将准备好的信息发送到client上 如果本次设置了 dstkey 就是将value从原robj转移到另一个robj上
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == C_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation.
* 本次操作失败 将元素归还到robj中 期望之后的client可以正常处理
* */
listTypePush(o,value,where);
}
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
// 如果此时redisObject中的元素被取完了 相当于还未准备好数据
break;
}
}
}
// 代表此时该redisObject的元素被使用完了 如果某些client没有消费到这些对象 他们将继续保持block状态
if (listTypeLength(o) == 0) {
dbDelete(rl->db,rl->key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
}
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
复制代码
这样执行command的所有流程就是这样了,结果会写入到client对应的缓冲区,并在事件循环中的serverCron中异步的将数据写入到socket。
在看的过程中遇到了一些问题点,主要是:
1.什么情况下会产生blockclient
2.集群的主从架构中接收命令的总是master节点么,这将间接影响到。是一个主从结构下所有节点都维护路由表,还是只需要由master维护路由表。如果所有节点都维护,数据同步怎么做,如何仅仅由master维护。宕机又该如何处理?路由表又是在什么时候其作用的呢?
3.当某个key过期时,如何同步到其他节点。其他节点又是如何处理的?
这些问题也许都跟集群有关(block不确定,但是至少业务场景中很少用到这个命令,不排除这个命令是为集群服务的),接下来就是有关集群/副本/哨兵的解读了。