NodeJs 线程池与i/o

image

前言

开始写一些博客主要是在看完代码后再温故总结一遍, 也是为了后面回头也能查阅。本系列会从官网的例子出发, 尽可能以链路追踪的方式记录其中源码核心模块的实现, 本篇例子来源

涉及的知识点

例子 uvcat/main.c

一个耗时的 fs_open 任务如何异步的实现的例子。

int main(int argc, char **argv) {
    uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

    uv_fs_req_cleanup(&open_req);
    uv_fs_req_cleanup(&read_req);
    uv_fs_req_cleanup(&write_req);
    return 0;
}
复制代码

uv_fs_open

main > uv_fs_open

开始向事件循环 uv_default_loop 中注册任务。

int uv_fs_open(uv_loop_t* loop,
               uv_fs_t* req,
               const char* path,
               int flags,
               int mode,
               uv_fs_cb cb) {
  // ? 进行一些数据初始化操作
  INIT(OPEN);
  // ? 对传入的文件路径参数检查
  PATH;
  req->flags = flags;
  req->mode = mode;
  POST;
}
复制代码

INIT

main > uv_fs_open > INIT

把 uv_fs_open 传入的最后一个参数 cb, 挂载在了 uv_fs_t* req 的 cb 属性上了。

#define INIT(subtype)                                                         \
  do {                                                                        \
    if (req == NULL)                                                          \
      return UV_EINVAL;                                                       \
    UV_REQ_INIT(req, UV_FS);                                                  \
    req->fs_type = UV_FS_ ## subtype;                                         \
    req->result = 0;                                                          \
    req->ptr = NULL;                                                          \
    req->loop = loop;                                                         \
    req->path = NULL;                                                         \
    req->new_path = NULL;                                                     \
    req->bufs = NULL;                                                         \
    req->cb = cb;                                                             \
  }                                                                           \
  while (0)
复制代码

POST

main > uv_fs_open > POST

主要调用了 uv__work_submit 函数, 提交一个耗时的任务到线程池去完成。

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      //  (loop)->active_reqs.count++;
      uv__req_register(loop, req);                                            \
      uv__work_submit(loop,                                                   \
                      &req->work_req,                                         \
                      UV__WORK_FAST_IO,                                       \
                      // ? 调用 open 方法打开一个文件
                      uv__fs_work,                                            \
                      // ? 调用 uv_fs_open 函数参数中传入的 on_open
                      uv__fs_done);                                           \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)
复制代码

uv_queue_work

其实对于任意一种耗时的任务, libuv 也提供另外一种叫工作队列的方法, 可以轻松的提交任意数量的耗时任务到线程池中去解决。

实现和上面的 POST 方法类似, 也是通过调用 uv__work_submit 去提交一个任务, 只不过这个任务就不是 uv__fs_work, 而可以是用户传入的任何任务。下面是一个 uv_queue_work 的例子

可以看见通过一个 for 循环, 轻易通过 uv_queue_work 提交了若干个 after_fib 任务到线程池

// uv_queue_work 的例子

int main() {
    loop = uv_default_loop();

    int data[FIB_UNTIL];
    uv_work_t req[FIB_UNTIL];
    int i;
    for (i = 0; i < FIB_UNTIL; i++) {
        data[i] = i;
        req[i].data = (void *) &data[i];
        uv_queue_work(loop, &req[i], fib, after_fib);
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}
复制代码

uv_queue_work 的使用方式看上去和 async 这个 npm 包十分类似, 只不过 async 是提交的任务还是在该线程运行, uv_queue_work 提交的会交给线程池去运行。

// async 的例子

async.parallel([
    function(callback) { ... },
    function(callback) { ... }
], function(err, results) {
    // optional callback
});
复制代码

uv__work_submit

main > uv_fs_open > POST > uv__work_submit

把 POST 传入的 uv__fs_done 挂载在了 struct uv__work* w 的 done 属性上了。

把 POST 传入的 uv__fs_work 挂载在了 struct uv__work* w 的 work 属性上了。

此时我们可以看下调用的 init_once 函数, 发现其主要是调用了 init_threads 函数

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  // ? 调用 uv_cond_signal 唤醒一个线程, 开始干活了 ~
  post(&w->wq, kind);
}
复制代码

init_threads – 线程池

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads

线程池的初始化, 由于代码较长, 去除了部分不太重要的代码, 对应源码在 /deps/uv/src/threadpool.c。

下面会讲一下初始化过程中调用的 uv_cond, uv_sem 的知识。最后讲一下线程的工作内容即 worker 函数。

static void init_threads(void) {
  ...

  // ? 条件变量
  if (uv_cond_init(&cond))
    abort();
    
  // ? 锁: 一个线程加的锁必须由该线程解锁
  if (uv_mutex_init(&mutex))
    abort();
  QUEUE_INIT(&wq);
  QUEUE_INIT(&slow_io_pending_wq);
  QUEUE_INIT(&run_slow_work_message);
  
  // ? 信号量
  if (uv_sem_init(&sem, 0))
    abort();
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();
  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);
  uv_sem_destroy(&sem);
}
复制代码

uv_cond – 条件变量

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > uv_cond

在这里线程池中的线程当任务队列为空时 pthread_cond_wait 会一直等待在这, 等到主线程提交一个任务后会调用 pthread_cond_signal 函数唤醒一个线程开始工作

// 线程一伪代码
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

// 线程二伪代码
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
复制代码

uv_sem – 信号量

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > uv_sem

uv_sem_wait 也会使线程进入沉睡, 当有其他线程调用一次 uv_sem_post 后会运行一次, 这里的 uv_sem_wait 就是等所有 uv_thread_create 创建的 worker 都成功后代码才会继续往下执行, 其中每个 worker 函数里面会调用一次 uv_sem_post。

保证 init_threads 函数是等线程池全部创建完成才结束运行。

和 uv_cond 有类似的地方, 比如用 uv_cond 实现的话, 每创建一次 worker 计数一次, 当时最后一次创建的线程的 worker 里面调用一次 pthread_cond_signal 我觉得也是可行的。

  • uv_sem_wait – 在 init_threads 函数中被调用
  • uv_sem_post – 在 worker 函数中被调用

worker

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker

线程运行的函数, 当队列为空时该线程函数会一直沉睡在 uv_cond_wait 函数处, 当 uv__work_submit 提交了一个函数调用了 pthread_cond_signal 后, 一个线程开始工作 ~

static void worker(void* arg) {
  ...
  
  // ? 通知主线程 uv_sem_wait 可以开始运行一次了
  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;

  uv_mutex_lock(&mutex);
  for (;;) {
    /* `mutex` should always be locked at this point. */

    /* Keep waiting while either no work is present or only slow I/O
       and we're at the threshold for that. */
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1;
      // ? 队列为空时, 线程会一直沉睡在这
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }

    ...

    uv_mutex_unlock(&mutex);

    w = QUEUE_DATA(q, struct uv__work, wq);
 
    // ? 开始执行 uv__fs_work 工作
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
   
    // ? uv__fs_work 任务完成后, 开始通知对应 fd
    uv_async_send(&w->loop->wq_async);

    ...
  }
}

复制代码

uv_async_send

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send

当该线程耗时的 w->work(即此例子的 uv__work_submit 中设置的 uv__fs_work)任务完成后, 该去通知主线程了

uv_async_send 会判断当前任务有没有其他线程在进行操作, 如果有其他线程正在调用 uv__async_send 发送消息, 则直接跳过, 最后有一个线程通知到主线程即可

该函数中调用的 cmpxchgi 函数实现又是干什么的了。

int uv_async_send(uv_async_t* handle) {
  /* Do a cheap read first. */
  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;

  /* Tell the other thread we're busy with the handle. */
  if (cmpxchgi(&handle->pending, 0, 1) != 0)
    return 0;

  /* Wake up the other thread's event loop. */
  uv__async_send(handle->loop);

  /* Tell the other thread we're done. */
  if (cmpxchgi(&handle->pending, 1, 2) != 1)
    abort();

  return 0;
}
复制代码

cmpxchgi

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > cmpxchgi

内联汇编语句看不懂不要紧, 我们可以看看等价的 __sync_val_compare_and_swap 函数的作用就好了

UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval)) {
#if defined(__i386__) || defined(__x86_64__)
  int out;
  __asm__ __volatile__ ("lock; cmpxchg %2, %1;"
                        : "=a" (out), "+m" (*(volatile int*) ptr)
                        : "r" (newval), "0" (oldval)
                        : "memory");
  return out;
#elif defined(__MVS__)
  unsigned int op4;
  if (__plo_CSST(ptr, (unsigned int*) &oldval, newval,
                (unsigned int*) ptr, *ptr, &op4))
    return oldval;
  else
    return op4;
#elif defined(__SUNPRO_C) || defined(__SUNPRO_CC)
  return atomic_cas_uint((uint_t *)ptr, (uint_t)oldval, (uint_t)newval);
#else
  return __sync_val_compare_and_swap(ptr, oldval, newval);
#endif
}
复制代码

__sync_val_compare_and_swap

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > cmpxchgi > __sync_val_compare_and_swap

提供原子的比较和交换,如果*ptr == oldval,就将 newval 写入 *ptr。看完解释还是能理解一点了, 类似于赋值操作, 那么回到主线看看 uv__async_send 函数吧

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
复制代码

uv__async_send

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > uv__async_send

可见主要是在 loop->async_io_watcher.fd 上面写了数据, 此时 epoll 该登场了, 它就是负责观察所有通过 epoll_ctl 函数注册的 fd,当某个 fd 变化,就通知到对应的 i/o 观察者。

等等再说 epoll 之前, 先看一下我们的 i/o 观察者是在何时注册的 ?

static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = loop->async_wfd;

#if defined(__linux__)
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    // ? 注意该处的 fd 挂载在 loop->async_io_watcher.fd 上
    fd = loop->async_io_watcher.fd;  /* eventfd */
  }
#endif

  do
  	// ? fd 上面写入数据, 通知主线程
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}
复制代码

uv__io_start – 注册 i/o 观察者

main > uv_default_loop > uv_loop_init > uv_async_init > uv__async_start > uv__io_start

调用 uv__io_start 函数注册一个 i/o 观察者。那么该 i/o 观察者需要观察的 fd 是何时设置的了 ?

void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  ...

  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}
复制代码

uv__io_init – 初始化 i/o 观察者

main > uv_default_loop > uv_loop_init > uv_async_init > uv__async_start > uv__io_init

注意本例子的目的是主线程希望知道什么时候子线程完成了任务, 所以我们需要先获得一个线程通信的 fd 用来观察就行了

可以看到在 uv__async_start 函数中是通过 eventfd 拿到线程通信的 fd, 然后通过调用 uv__io_init 给挂载在观察者上。同时设置了该观察者的回调函数为 uv__async_io 。

此时我们已经成功初始化了一个 i/o 观察者, 接下来就等着有数据写入时, epoll 捕获到调用观察者设置的回调函数就好了。

static int uv__async_start(uv_loop_t* loop) {
 ...
#ifdef __linux__
  err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  if (err < 0)
    return UV__ERR(errno);

  pipefd[0] = err;
  pipefd[1] = -1;
#else
  err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
  if (err < 0)
    return err;
#endif

  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}
复制代码

uv__async_io – 异步 i/o 观察者回调

uv__async_io 首先会把其他线程写入的数据给读完, 最后调用了 h->async_cb 函数。

其中的 h->async_cb 函数又是什么东西了?

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE* q;
  uv_async_t* h;

  assert(w == &loop->async_io_watcher);

  for (;;) {
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }

  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    if (0 == uv__async_spin(h))
      continue;  /* Not pending. */

    if (h->async_cb == NULL)
      continue;

    h->async_cb(h);
  }
}
复制代码

uv_async_init

main > uv_default_loop > uv_loop_init > uv_async_init

h->async_cb 函数发现是在 uv_async_init 中设置的, 需要再往上查找调用 uv_async_init 的地方才能知道第三个参数 async_cb 的真实的值。

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;

  err = uv__async_start(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;

  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}
复制代码

uv_loop_init

main > uv_default_loop > uv_loop_init

uv__async_io 函数的 h->async_cb(h),其实是 uv_loop_init 函数里面设置的 uv__work_done

int uv_loop_init(uv_loop_t* loop) {
  ....
  err = uv_async_init(loop, &loop->wq_async, uv__work_done);
  ...
}
复制代码

uv__work_done

uv__work_done 主要是调用了 w->done(w, err), 等等, 这不就是 uv__work_submit 函数中设置的 uv__fs_done 函数吗!

void uv__work_done(uv_async_t* handle) {
  ...

  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err);
  }
}
复制代码

uv__fs_done

uv__fs_done 函数主要调用的是 req->cb(req), 这就是 INIT 中设置的例子中的 on_open 函数了!

static void uv__fs_done(struct uv__work* w, int status) {
  uv_fs_t* req;

  req = container_of(w, uv_fs_t, work_req);
  uv__req_unregister(req->loop, req);

  if (status == UV_ECANCELED) {
    assert(req->result == 0);
    req->result = UV_ECANCELED;
  }

  req->cb(req);
}
复制代码

epoll – 概念

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

int epoll_create(int size);  
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
复制代码
  1. epoll_create – 在内核中创建epoll实例并返回一个epoll文件描述符。 在最初的实现中,调用者通过 size 参数告知内核需要监听的文件描述符数量。如果监听的文件描述符数量超过 size, 则内核会自动扩容。而现在 size 已经没有这种语义了,但是调用者调用时 size 依然必须大于 0,以保证后向兼容性。
  2. epoll_ctl – 向 epfd 对应的内核epoll 实例添加、修改或删除对 fd 上事件 event 的监听。op 可以为 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分别对应的是添加新的事件,修改文件描述符上监听的事件类型,从实例上删除一个事件。如果 event 的 events 属性设置了 EPOLLET flag,那么监听该事件的方式是边缘触发。
  3. epoll_wait – 当 timeout 为 0 时,epoll_wait 永远会立即返回。而 timeout 为 -1 时,epoll_wait 会一直阻塞直到任一已注册的事件变为就绪。当 timeout 为一正整数时,epoll 会阻塞直到计时 timeout 毫秒终了或已注册的事件变为就绪。因为内核调度延迟,阻塞的时间可能会略微超过 timeout 毫秒。

epoll_create – 创建epoll对象

main > uv_default_loop > uv_loop_init > uv__platform_loop_init > epoll_create

如下图所示,当某个进程调用epoll_create方法时,内核会创建一个eventpoll对象(也就是程序中epfd所代表的对象)。eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。

可见 libuv 是在 uv__platform_loop_init 函数中进行创建的。

int uv__platform_loop_init(uv_loop_t* loop) {
  int fd;
  // ? 此处进行 epoll_create fd 的创建
  fd = epoll_create1(O_CLOEXEC);

  /* epoll_create1() can fail either because it's not implemented (old kernel)
   * or because it doesn't understand the O_CLOEXEC flag.
   */
  if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) {
    fd = epoll_create(256);

    if (fd != -1)
      uv__cloexec(fd, 1);
  }

  loop->backend_fd = fd;
  loop->inotify_fd = -1;
  loop->inotify_watchers = NULL;

  if (fd == -1)
    return UV__ERR(errno);

  return 0;
}
复制代码

epoll_ctl – 维护监视列表

main > uv_run > uv__io_poll >

创建epoll对象后,可以用epoll_ctl添加或删除所要监听的socket。以添加socket为例,如下图,如果通过epoll_ctl添加sock1、sock2和sock3的监视,内核会将eventpoll添加到这三个socket的等待队列中。

libuv 中在 uv__io_poll 函数里被调用。

uv__io_poll: 函数是上节 【libuv 源码学习笔记】1. 事件循环 事件循环中非常重要的一个阶段。

void uv__io_poll(uv_loop_t* loop, int timeout) {
  ...
  while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    ...
    // ? epoll_ctl - 事件注册函数,注册新的fd到epfd的epool对象空间中,并指明event可读写
    if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
      if (errno != EEXIST)
        abort();

      assert(op == EPOLL_CTL_ADD);

      /* We've reactivated a file descriptor that's been watched before. */
      if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
        abort();
    }

    w->events = w->pevents;
  }
  ...
}
复制代码

epoll_wait

main > uv_run > uv__io_poll >

当socket收到数据后,中断程序会给eventpoll的“就绪列表”添加socket引用。如下图展示的是sock2和sock3收到数据后,中断程序让rdlist引用这两个socket。

当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程。

这也是上一节事件循环中, 如果没有其他阶段没有任务, 会使进程一直阻塞到一个 timer 超时的时间。

libuv 中在 uv__io_poll 函数里被调用。

void uv__io_poll(uv_loop_t* loop, int timeout) {
  ...
  for (;;) {
    // 
    if (no_epoll_wait != 0 || (sigmask != 0 && no_epoll_pwait == 0)) {
      // ? epoll_wait - 阻塞直到任一已注册的事件变为就绪
      nfds = epoll_pwait(loop->backend_fd,
                         events,
                         ARRAY_SIZE(events),
                         timeout,
                         &sigset);
      if (nfds == -1 && errno == ENOSYS) {
        uv__store_relaxed(&no_epoll_pwait_cached, 1);
        no_epoll_pwait = 1;
      }
    } else {
      // ? epoll_wait - 阻塞直到任一已注册的事件变为就绪
      nfds = epoll_wait(loop->backend_fd,
                        events,
                        ARRAY_SIZE(events),
                        timeout);
      if (nfds == -1 && errno == ENOSYS) {
        uv__store_relaxed(&no_epoll_wait_cached, 1);
        no_epoll_wait = 1;
      }
    }
	...
	// ? 遍历被内核IO事件异步唤醒而加入Ready队列的描述符集合
    for (i = 0; i < nfds; i++) {
      pe = events + i;
      fd = pe->data.fd;
      ...
      if (pe->events != 0) {
        /* Run signal watchers last.  This also affects child process watchers
         * because those are implemented in terms of signal watchers.
         */
        if (w == &loop->signal_io_watcher) {
          have_signals = 1;
        } else {
          uv__metrics_update_idle_time(loop);
          // ? 调用观察者注册的回调
          w->cb(loop, w, pe->events);
        }

        nevents++;
      }
    }
	...
}
复制代码

uv_fs_req_cleanup

main > uv_fs_req_cleanup

当 uv_fs_open 回调 on_open 被调用, 本例子中的事件循环 uv_run 函数运行结束, 代码开始运行 uv_fs_req_cleanup, 进行垃圾回收, 本次程序顺利退出。

小结

Q: 本例子中一个耗时的 fs_open 的任务如何通过异步的方式实现 ?

A: 程序先会初始化一次线程池, 任务队列为空时线程都进入沉睡状态。当调用 uv_fs_open 方法提交一个 fs_open 任务时, 会通过 pthread_cond_signal 唤醒一个线程开始工作, 工作内容即为 fs_open, 在该线程内同步等待 fs_open 函数结束, 然后去通知主线程。

Q: 那么如何通知主线程了

A: 耗时的任务原来交给了其他线程, 主线程被通知其实是首先通过 eventfd 获取到了一个线程通信的 fd, 然后通过 epoll 机制去注册一个 i/o 观察者, 当其他线程任务完成后, 向该 fd 写入数据。被 epoll 捕获到调用主线程早已设置好的回调函数就好了。

原文链接: github.com/xiaoxiaojx/… 不定时会更新, 感兴趣的大佬可以点个 Star 支持, 谢谢阅读 ~

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享