。
前言
开始写一些博客主要是在看完代码后再温故总结一遍, 也是为了后面回头也能查阅。本系列会从官网的例子出发, 尽可能以链路追踪的方式记录其中源码核心模块的实现, 本篇例子来源
涉及的知识点
- libuv 中线程池的实现
- libuv 异步 i/o 的实现
- libuv 中线程通信
- 线程锁,条件变量 信号量 在 libuv 的运用
- epoll 在 libuv 的运用
- eventfd 在 libuv 的运用
- async npm 包
- Assembler Instructions with C Expression Operands
例子 uvcat/main.c
一个耗时的 fs_open 任务如何异步的实现的例子。
int main(int argc, char **argv) {
uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_fs_req_cleanup(&open_req);
uv_fs_req_cleanup(&read_req);
uv_fs_req_cleanup(&write_req);
return 0;
}
复制代码
uv_fs_open
main > uv_fs_open
开始向事件循环 uv_default_loop 中注册任务。
int uv_fs_open(uv_loop_t* loop,
uv_fs_t* req,
const char* path,
int flags,
int mode,
uv_fs_cb cb) {
// ? 进行一些数据初始化操作
INIT(OPEN);
// ? 对传入的文件路径参数检查
PATH;
req->flags = flags;
req->mode = mode;
POST;
}
复制代码
INIT
main > uv_fs_open > INIT
把 uv_fs_open 传入的最后一个参数 cb, 挂载在了 uv_fs_t* req 的 cb 属性上了。
#define INIT(subtype) \
do { \
if (req == NULL) \
return UV_EINVAL; \
UV_REQ_INIT(req, UV_FS); \
req->fs_type = UV_FS_ ## subtype; \
req->result = 0; \
req->ptr = NULL; \
req->loop = loop; \
req->path = NULL; \
req->new_path = NULL; \
req->bufs = NULL; \
req->cb = cb; \
} \
while (0)
复制代码
POST
main > uv_fs_open > POST
主要调用了 uv__work_submit 函数, 提交一个耗时的任务到线程池去完成。
#define POST \
do { \
if (cb != NULL) { \
// (loop)->active_reqs.count++;
uv__req_register(loop, req); \
uv__work_submit(loop, \
&req->work_req, \
UV__WORK_FAST_IO, \
// ? 调用 open 方法打开一个文件
uv__fs_work, \
// ? 调用 uv_fs_open 函数参数中传入的 on_open
uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)
复制代码
uv_queue_work
其实对于任意一种耗时的任务, libuv 也提供另外一种叫工作队列的方法, 可以轻松的提交任意数量的耗时任务到线程池中去解决。
实现和上面的 POST 方法类似, 也是通过调用 uv__work_submit 去提交一个任务, 只不过这个任务就不是 uv__fs_work, 而可以是用户传入的任何任务。下面是一个 uv_queue_work 的例子
可以看见通过一个 for 循环, 轻易通过 uv_queue_work 提交了若干个 after_fib 任务到线程池
// uv_queue_work 的例子
int main() {
loop = uv_default_loop();
int data[FIB_UNTIL];
uv_work_t req[FIB_UNTIL];
int i;
for (i = 0; i < FIB_UNTIL; i++) {
data[i] = i;
req[i].data = (void *) &data[i];
uv_queue_work(loop, &req[i], fib, after_fib);
}
return uv_run(loop, UV_RUN_DEFAULT);
}
复制代码
uv_queue_work 的使用方式看上去和 async 这个 npm 包十分类似, 只不过 async 是提交的任务还是在该线程运行, uv_queue_work 提交的会交给线程池去运行。
// async 的例子
async.parallel([
function(callback) { ... },
function(callback) { ... }
], function(err, results) {
// optional callback
});
复制代码
uv__work_submit
main > uv_fs_open > POST > uv__work_submit
把 POST 传入的 uv__fs_done 挂载在了 struct uv__work* w 的 done 属性上了。
把 POST 传入的 uv__fs_work 挂载在了 struct uv__work* w 的 work 属性上了。
此时我们可以看下调用的 init_once 函数, 发现其主要是调用了 init_threads 函数
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
// ? 调用 uv_cond_signal 唤醒一个线程, 开始干活了 ~
post(&w->wq, kind);
}
复制代码
init_threads – 线程池
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads
线程池的初始化, 由于代码较长, 去除了部分不太重要的代码, 对应源码在 /deps/uv/src/threadpool.c。
下面会讲一下初始化过程中调用的 uv_cond, uv_sem 的知识。最后讲一下线程的工作内容即 worker 函数。
static void init_threads(void) {
...
// ? 条件变量
if (uv_cond_init(&cond))
abort();
// ? 锁: 一个线程加的锁必须由该线程解锁
if (uv_mutex_init(&mutex))
abort();
QUEUE_INIT(&wq);
QUEUE_INIT(&slow_io_pending_wq);
QUEUE_INIT(&run_slow_work_message);
// ? 信号量
if (uv_sem_init(&sem, 0))
abort();
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem);
uv_sem_destroy(&sem);
}
复制代码
uv_cond – 条件变量
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > uv_cond
在这里线程池中的线程当任务队列为空时 pthread_cond_wait 会一直等待在这, 等到主线程提交一个任务后会调用 pthread_cond_signal 函数唤醒一个线程开始工作
// 线程一伪代码
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
// 线程二伪代码
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
复制代码
uv_sem – 信号量
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > uv_sem
uv_sem_wait 也会使线程进入沉睡, 当有其他线程调用一次 uv_sem_post 后会运行一次, 这里的 uv_sem_wait 就是等所有 uv_thread_create 创建的 worker 都成功后代码才会继续往下执行, 其中每个 worker 函数里面会调用一次 uv_sem_post。
保证 init_threads 函数是等线程池全部创建完成才结束运行。
和 uv_cond 有类似的地方, 比如用 uv_cond 实现的话, 每创建一次 worker 计数一次, 当时最后一次创建的线程的 worker 里面调用一次 pthread_cond_signal 我觉得也是可行的。
- uv_sem_wait – 在 init_threads 函数中被调用
- uv_sem_post – 在 worker 函数中被调用
worker
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker
线程运行的函数, 当队列为空时该线程函数会一直沉睡在 uv_cond_wait 函数处, 当 uv__work_submit 提交了一个函数调用了 pthread_cond_signal 后, 一个线程开始工作 ~
static void worker(void* arg) {
...
// ? 通知主线程 uv_sem_wait 可以开始运行一次了
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
uv_mutex_lock(&mutex);
for (;;) {
/* `mutex` should always be locked at this point. */
/* Keep waiting while either no work is present or only slow I/O
and we're at the threshold for that. */
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1;
// ? 队列为空时, 线程会一直沉睡在这
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
...
uv_mutex_unlock(&mutex);
w = QUEUE_DATA(q, struct uv__work, wq);
// ? 开始执行 uv__fs_work 工作
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
// ? uv__fs_work 任务完成后, 开始通知对应 fd
uv_async_send(&w->loop->wq_async);
...
}
}
复制代码
uv_async_send
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send
当该线程耗时的 w->work(即此例子的 uv__work_submit 中设置的 uv__fs_work)任务完成后, 该去通知主线程了
uv_async_send 会判断当前任务有没有其他线程在进行操作, 如果有其他线程正在调用 uv__async_send 发送消息, 则直接跳过, 最后有一个线程通知到主线程即可
该函数中调用的 cmpxchgi 函数实现又是干什么的了。
int uv_async_send(uv_async_t* handle) {
/* Do a cheap read first. */
if (ACCESS_ONCE(int, handle->pending) != 0)
return 0;
/* Tell the other thread we're busy with the handle. */
if (cmpxchgi(&handle->pending, 0, 1) != 0)
return 0;
/* Wake up the other thread's event loop. */
uv__async_send(handle->loop);
/* Tell the other thread we're done. */
if (cmpxchgi(&handle->pending, 1, 2) != 1)
abort();
return 0;
}
复制代码
cmpxchgi
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > cmpxchgi
内联汇编语句看不懂不要紧, 我们可以看看等价的 __sync_val_compare_and_swap 函数的作用就好了
UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval)) {
#if defined(__i386__) || defined(__x86_64__)
int out;
__asm__ __volatile__ ("lock; cmpxchg %2, %1;"
: "=a" (out), "+m" (*(volatile int*) ptr)
: "r" (newval), "0" (oldval)
: "memory");
return out;
#elif defined(__MVS__)
unsigned int op4;
if (__plo_CSST(ptr, (unsigned int*) &oldval, newval,
(unsigned int*) ptr, *ptr, &op4))
return oldval;
else
return op4;
#elif defined(__SUNPRO_C) || defined(__SUNPRO_CC)
return atomic_cas_uint((uint_t *)ptr, (uint_t)oldval, (uint_t)newval);
#else
return __sync_val_compare_and_swap(ptr, oldval, newval);
#endif
}
复制代码
__sync_val_compare_and_swap
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > cmpxchgi > __sync_val_compare_and_swap
提供原子的比较和交换,如果*ptr == oldval,就将 newval 写入 *ptr。看完解释还是能理解一点了, 类似于赋值操作, 那么回到主线看看 uv__async_send 函数吧
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
复制代码
uv__async_send
main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > uv__async_send
可见主要是在 loop->async_io_watcher.fd 上面写了数据, 此时 epoll 该登场了, 它就是负责观察所有通过 epoll_ctl 函数注册的 fd,当某个 fd 变化,就通知到对应的 i/o 观察者。
等等再说 epoll 之前, 先看一下我们的 i/o 观察者是在何时注册的 ?
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
int r;
buf = "";
len = 1;
fd = loop->async_wfd;
#if defined(__linux__)
if (fd == -1) {
static const uint64_t val = 1;
buf = &val;
len = sizeof(val);
// ? 注意该处的 fd 挂载在 loop->async_io_watcher.fd 上
fd = loop->async_io_watcher.fd; /* eventfd */
}
#endif
do
// ? fd 上面写入数据, 通知主线程
r = write(fd, buf, len);
while (r == -1 && errno == EINTR);
if (r == len)
return;
if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
abort();
}
复制代码
uv__io_start – 注册 i/o 观察者
main > uv_default_loop > uv_loop_init > uv_async_init > uv__async_start > uv__io_start
调用 uv__io_start 函数注册一个 i/o 观察者。那么该 i/o 观察者需要观察的 fd 是何时设置的了 ?
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
...
if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}
复制代码
uv__io_init – 初始化 i/o 观察者
main > uv_default_loop > uv_loop_init > uv_async_init > uv__async_start > uv__io_init
注意本例子的目的是主线程希望知道什么时候子线程完成了任务, 所以我们需要先获得一个线程通信的 fd 用来观察就行了
可以看到在 uv__async_start 函数中是通过 eventfd 拿到线程通信的 fd, 然后通过调用 uv__io_init 给挂载在观察者上。同时设置了该观察者的回调函数为 uv__async_io 。
此时我们已经成功初始化了一个 i/o 观察者, 接下来就等着有数据写入时, epoll 捕获到调用观察者设置的回调函数就好了。
static int uv__async_start(uv_loop_t* loop) {
...
#ifdef __linux__
err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (err < 0)
return UV__ERR(errno);
pipefd[0] = err;
pipefd[1] = -1;
#else
err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
if (err < 0)
return err;
#endif
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
复制代码
uv__async_io – 异步 i/o 观察者回调
uv__async_io 首先会把其他线程写入的数据给读完, 最后调用了 h->async_cb 函数。
其中的 h->async_cb 函数又是什么东西了?
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;
assert(w == &loop->async_io_watcher);
for (;;) {
r = read(w->fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
if (0 == uv__async_spin(h))
continue; /* Not pending. */
if (h->async_cb == NULL)
continue;
h->async_cb(h);
}
}
复制代码
uv_async_init
main > uv_default_loop > uv_loop_init > uv_async_init
h->async_cb 函数发现是在 uv_async_init 中设置的, 需要再往上查找调用 uv_async_init 的地方才能知道第三个参数 async_cb 的真实的值。
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;
err = uv__async_start(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
handle->pending = 0;
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
return 0;
}
复制代码
uv_loop_init
main > uv_default_loop > uv_loop_init
uv__async_io 函数的 h->async_cb(h),其实是 uv_loop_init 函数里面设置的 uv__work_done
int uv_loop_init(uv_loop_t* loop) {
....
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
...
}
复制代码
uv__work_done
uv__work_done 主要是调用了 w->done(w, err), 等等, 这不就是 uv__work_submit 函数中设置的 uv__fs_done 函数吗!
void uv__work_done(uv_async_t* handle) {
...
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
复制代码
uv__fs_done
uv__fs_done 函数主要调用的是 req->cb(req), 这就是 INIT 中设置的例子中的 on_open 函数了!
static void uv__fs_done(struct uv__work* w, int status) {
uv_fs_t* req;
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);
if (status == UV_ECANCELED) {
assert(req->result == 0);
req->result = UV_ECANCELED;
}
req->cb(req);
}
复制代码
epoll – 概念
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
复制代码
- epoll_create – 在内核中创建epoll实例并返回一个epoll文件描述符。 在最初的实现中,调用者通过 size 参数告知内核需要监听的文件描述符数量。如果监听的文件描述符数量超过 size, 则内核会自动扩容。而现在 size 已经没有这种语义了,但是调用者调用时 size 依然必须大于 0,以保证后向兼容性。
- epoll_ctl – 向 epfd 对应的内核epoll 实例添加、修改或删除对 fd 上事件 event 的监听。op 可以为 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分别对应的是添加新的事件,修改文件描述符上监听的事件类型,从实例上删除一个事件。如果 event 的 events 属性设置了 EPOLLET flag,那么监听该事件的方式是边缘触发。
- epoll_wait – 当 timeout 为 0 时,epoll_wait 永远会立即返回。而 timeout 为 -1 时,epoll_wait 会一直阻塞直到任一已注册的事件变为就绪。当 timeout 为一正整数时,epoll 会阻塞直到计时 timeout 毫秒终了或已注册的事件变为就绪。因为内核调度延迟,阻塞的时间可能会略微超过 timeout 毫秒。
epoll_create – 创建epoll对象
main > uv_default_loop > uv_loop_init > uv__platform_loop_init > epoll_create
如下图所示,当某个进程调用epoll_create方法时,内核会创建一个eventpoll对象(也就是程序中epfd所代表的对象)。eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。
可见 libuv 是在 uv__platform_loop_init 函数中进行创建的。
int uv__platform_loop_init(uv_loop_t* loop) {
int fd;
// ? 此处进行 epoll_create fd 的创建
fd = epoll_create1(O_CLOEXEC);
/* epoll_create1() can fail either because it's not implemented (old kernel)
* or because it doesn't understand the O_CLOEXEC flag.
*/
if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) {
fd = epoll_create(256);
if (fd != -1)
uv__cloexec(fd, 1);
}
loop->backend_fd = fd;
loop->inotify_fd = -1;
loop->inotify_watchers = NULL;
if (fd == -1)
return UV__ERR(errno);
return 0;
}
复制代码
epoll_ctl – 维护监视列表
main > uv_run > uv__io_poll >
创建epoll对象后,可以用epoll_ctl添加或删除所要监听的socket。以添加socket为例,如下图,如果通过epoll_ctl添加sock1、sock2和sock3的监视,内核会将eventpoll添加到这三个socket的等待队列中。
libuv 中在 uv__io_poll 函数里被调用。
uv__io_poll: 函数是上节 【libuv 源码学习笔记】1. 事件循环 事件循环中非常重要的一个阶段。
void uv__io_poll(uv_loop_t* loop, int timeout) {
...
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
...
// ? epoll_ctl - 事件注册函数,注册新的fd到epfd的epool对象空间中,并指明event可读写
if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();
assert(op == EPOLL_CTL_ADD);
/* We've reactivated a file descriptor that's been watched before. */
if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
abort();
}
w->events = w->pevents;
}
...
}
复制代码
epoll_wait
main > uv_run > uv__io_poll >
当socket收到数据后,中断程序会给eventpoll的“就绪列表”添加socket引用。如下图展示的是sock2和sock3收到数据后,中断程序让rdlist引用这两个socket。
当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程。
这也是上一节事件循环中, 如果没有其他阶段没有任务, 会使进程一直阻塞到一个 timer 超时的时间。
libuv 中在 uv__io_poll 函数里被调用。
void uv__io_poll(uv_loop_t* loop, int timeout) {
...
for (;;) {
//
if (no_epoll_wait != 0 || (sigmask != 0 && no_epoll_pwait == 0)) {
// ? epoll_wait - 阻塞直到任一已注册的事件变为就绪
nfds = epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
&sigset);
if (nfds == -1 && errno == ENOSYS) {
uv__store_relaxed(&no_epoll_pwait_cached, 1);
no_epoll_pwait = 1;
}
} else {
// ? epoll_wait - 阻塞直到任一已注册的事件变为就绪
nfds = epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);
if (nfds == -1 && errno == ENOSYS) {
uv__store_relaxed(&no_epoll_wait_cached, 1);
no_epoll_wait = 1;
}
}
...
// ? 遍历被内核IO事件异步唤醒而加入Ready队列的描述符集合
for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->data.fd;
...
if (pe->events != 0) {
/* Run signal watchers last. This also affects child process watchers
* because those are implemented in terms of signal watchers.
*/
if (w == &loop->signal_io_watcher) {
have_signals = 1;
} else {
uv__metrics_update_idle_time(loop);
// ? 调用观察者注册的回调
w->cb(loop, w, pe->events);
}
nevents++;
}
}
...
}
复制代码
uv_fs_req_cleanup
main > uv_fs_req_cleanup
当 uv_fs_open 回调 on_open 被调用, 本例子中的事件循环 uv_run 函数运行结束, 代码开始运行 uv_fs_req_cleanup, 进行垃圾回收, 本次程序顺利退出。
小结
Q: 本例子中一个耗时的 fs_open 的任务如何通过异步的方式实现 ?
A: 程序先会初始化一次线程池, 任务队列为空时线程都进入沉睡状态。当调用 uv_fs_open 方法提交一个 fs_open 任务时, 会通过 pthread_cond_signal 唤醒一个线程开始工作, 工作内容即为 fs_open, 在该线程内同步等待 fs_open 函数结束, 然后去通知主线程。
Q: 那么如何通知主线程了
A: 耗时的任务原来交给了其他线程, 主线程被通知其实是首先通过 eventfd 获取到了一个线程通信的 fd, 然后通过 epoll 机制去注册一个 i/o 观察者, 当其他线程任务完成后, 向该 fd 写入数据。被 epoll 捕获到调用主线程早已设置好的回调函数就好了。
原文链接: github.com/xiaoxiaojx/… 不定时会更新, 感兴趣的大佬可以点个 Star 支持, 谢谢阅读 ~