前言
上一篇文章介绍了redis服务器启动过程中,eventloop的初始化和epoll等方法的调用,创建了多个事件处理器用于处理客户端的连接,客户端连接后还会注册读事件。本文从多线程的角度来分析redis如何处理命令
正文
Redis6.0添加了多线程特性,但是从前面的源码分析可以看出来,redis底层数据结构在增删上面没有加锁,所以可以断定多线程并不是处理数据的并发,而是IO多线程
IO多线程的含义是:在读取client的数据时候使用多线程读,将数据读入缓存中,中间主线程执行逻辑将结果存入发送缓存,最后调用多线程去写
这样的好处是可以降低由于网络IO的原因造成的主线程阻塞
多线程初始化
bio初始化
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
复制代码
initServer调用之后会调用InitServerLast,这里进行了bio和thread的初始化
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
//初始化锁等
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1;
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
//创建线程
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
复制代码
bioInit创建了三个线程,线程运行的方法是bioProcessBackgroundJobs,用0、1、2来进行标志
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
switch (type) {
case BIO_CLOSE_FILE:
redis_set_thread_title("bio_close_file");
break;
case BIO_AOF_FSYNC:
redis_set_thread_title("bio_aof_fsync");
break;
case BIO_LAZY_FREE:
redis_set_thread_title("bio_lazy_free");
break;
}
复制代码
可以看到这三个线程分别是:关闭文件、aof的fsyhc以及延迟删除线程
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
//取出一个job
ln = listFirst(bio_jobs[type]);
job = ln->value;
pthread_mutex_unlock(&bio_mutex[type]);
//真正处理事件
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
pthread_cond_broadcast(&bio_step_cond[type]);
}
复制代码
这三个线程通过获取bio_jobs链表节点,根据type不同来进行处理,比如之前分析aof源码中看到的代码:
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
复制代码
通过向链表添加节点来提交任务,这里进行了加锁处理
首先初始化bio,创建三个线程分别处理文件关闭、fsync以及延迟删除任务,将这部分不涉及主服务的逻辑使用多线程处理,提高运行效率
多线程初始化
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
//设置为1表示没开启多线程
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
//给线程加锁
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
复制代码
这里通过io_threads_num创建了对应数量的线程,并且对线程进行加锁,核心逻辑是IOThreadMain
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
char thdname[16];
//设置打印线程名称
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
//等待线程启动
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
//这里有一个加锁和解锁的方法
if (io_threads_pending[id] == 0) {
//调用IOThreadMain方法之前加了一次锁 所以这里lock会导致这方法阻塞 直到startThreadedIO
pthread_mutex_lock(&io_threads_mutex[id]);
//这里unlock是为了防止stopThreadedIO里面的lock方法与上面形成死锁
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
复制代码
值得注意的是上面代码中文注释的部分,这里使用了io_threads_pending和io_threads_list两个数组,前者表示有多少个事件准备好被处理,后者表示事件,该线程首先调用lock方法被阻塞,直到其他方法unlock后开始主逻辑,获取io_threads_list的事件,根据事件的类型不同,将读写分为writeToClient和readQueryFromClient
多线程核心
IO多线程和bio一样,都是等待处理数组事件,那么是什么时候添加进去的?
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
复制代码
服务器main方法的最后就是调用了aeMain方法,进行了事件循环
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
//无事发生
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
//计算需要等待多久才到下一个定时器执行时间
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL;
}
}
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
复制代码
首先处理定时任务,这里不经过多解释,serverCron就是这里调用的
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
//获取到达事件个数
numevents = aeApiPoll(eventLoop, tvp);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
复制代码
这里调用了beforesleep——aeApiPoll——aftersleep,aeApiPoll会阻塞一段时间去获取到达的事件,先不看beforesleep和aftersleep,这里通过aeApiPoll获取到了几个到达的事件
for (j = 0; j < numevents; j++) {
//通过fired获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
//反转读写事件 先读后写 改为先写后读
int invert = fe->mask & AE_BARRIER;
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
//刷新
fe = &eventLoop->events[fd];
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
//触发定时
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
复制代码
事件到达后根据事件的类型调用rfileProc和wfileProc,对于客户端的读写来说就是writeToClient和readQueryFromClient,不管是多线程还有主线程都调用了readQueryFromClient方法,看一下里面是什么
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
//检查我们是否想要稍后退出时从客户端读取事件循环。如果启用了线程I/O,就会出现这种情况。
if (postponeClientRead(c)) return;
.....
int postponeClientRead(client *c) {
if (io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
复制代码
readQueryFromClient进去的第一步会判断是否开启了多线程,如果开启了多线程,就会将当前客户端加入clients_pending_read这个list中
++如果开启多线程,一次事件循环的时候并不会进行读取信息,而是放入clients_pending_read这个list中,真正读取client数据将在第二次事件循环中调用,就是通过beforeSleep方法++
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Just call a subset of vital functions in case we are re-entering
* the event loop from processEventsWhileBlocked(). Note that in this
* case we keep track of the number of events we are processing, since
* processEventsWhileBlocked() wants to stop ASAP if there are no longer
* events to handle. */
if (ProcessingEventsWhileBlocked) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += tlsProcessPendingData();
processed += handleClientsWithPendingWrites();
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
return;
}
//用于解锁客户端超时
handleBlockedClientsTimeout();
//分配读任务 以及执行
handleClientsWithPendingReadsUsingThreads();
tlsProcessPendingData();
aeSetDontWait(server.el, tlsHasPendingData());
//集群
if (server.cluster_enabled) clusterBeforeSleep();
//快速淘汰数据
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
//解锁调用了wait命令的客户端
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
if (moduleCount()) moduleHandleBlockedClients();
// *尝试处理刚被解除的客户端的挂起命令。 * /
if (listLength(server.unblocked_clients))
processUnblockedClients();
/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. Note that we do this after
* processUnblockedClients(), so if there are multiple pipelined WAITs
* and the just unblocked WAIT gets blocked again, we don't have to wait
* a server cron cycle in absence of other event loop events. See #6623. */
if (server.get_ack_from_slaves) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
server.get_ack_from_slaves = 0;
}
/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
//执行aof flush
flushAppendOnlyFile(0);
//写任务
handleClientsWithPendingWritesUsingThreads();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
if (moduleCount()) moduleReleaseGIL();
}
复制代码
这个方法干了以下几个事件:
- handleBlockedClientsTimeout:用于解锁客户端超时
- handleClientsWithPendingReadsUsingThreads:分配读任务 以及执行
- activeExpireCycle:快速淘汰数据
- processClientsWaitingReplicas:解锁调用wait命令客户端
- flushAppendOnlyFile:aofFlush
- handleClientsWithPendingWritesUsingThreads:写任务分配
根据这里可以看出来,每一次事件循环都会淘汰一批数据,而之前aof文章分析过的flushAppendOnlyFile方法就是在这里调用的
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
//将等待处理队列的连接配分给io线程
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
//设置io_threads_pending 启动io线程
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
//index=0交给主线程处理
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
//等待其他线程处理完成
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
//读完后统一进行处理
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
continue;
}
}
processInputBuffer(c);
}
return processed;
}
复制代码
这里将clients_pending_read里面的client平均分配给几个线程,同时自己也要参与读数据,最后等待其他线程都读取完毕后统一进行处理
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
//客户端被暂停
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
//client block了
if (c->flags & CLIENT_BLOCKED) break;
//不处理PENDING_COMMAND
if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
/* If the Gopher mode and we got zero or one argument, process
* the request in Gopher mode. */
if (server.gopher_enabled &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
//io多线程执行这个会中断 无法执行命令
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
//执行命令的核心方法
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}
//截取buf
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}
复制代码
readQueryFromClient会读取命令到querybuf,调用processInputBuffer来处理querybuf执行命令,如果开启了多线程读,那么当前线程不进行真正的执行命令,而是设置CLIENT_PENDING_COMMAND标志在主线程处理。就回到了handleClientsWithPendingReadsUsingThreads的最后一步,调用processCommandAndResetClient执行命令
int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}
复制代码
调用processCommand方法,进入到前面文章分析过的执行命令流程
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
//是否可以不用多线程读
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
//开启多线程
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
//将clients_pending_write的client分配给多线程
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
//主线程也参数进来
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
//等待所有线程都干完活儿
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
//如果还剩下一些没有发送完毕
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
复制代码
处理写事件和读事件大同小异,不过这里通过stopThreadedIOIfNeeded判断是否要关闭多线程
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
if (server.io_threads_num == 1) return 1;
//如果待发送的数据小于线程数量的两倍 停止多线程
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
void stopThreadedIO(void) {
//在停止之前调用一次读,将多线程读调用完毕
handleClientsWithPendingReadsUsingThreads();
if (tio_debug) { printf("E"); fflush(stdout); }
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
复制代码
通过对线程进行pthread_mutex_lock,线程就会被阻塞
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
//客户端是受到保护的
if (c->flags & CLIENT_PROTECTED) continue;
/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;
//如果写入之后仍然有数据
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
//如果开启aof而且是always 应该设置ae_barrier 让写事件先执行
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
复制代码
主线程将执行的结果返回给客户端,核心调用writeToClient方法,将buf中需要返回的内容发送,具体流程就不放送了
总结
redis通过创建了三个固定任务线程,以及多个IO线程进行对客户端数据的读取和发送,每一次事件循环之前会处理上一次事件循环中的事件,将读任务分配给多线程执行,等待读完毕后主线程再执行数据库操作。这样做降低了网络IO的开销,但是线程也会带来开销,提升有限