NodeJs 信号

image

前言

开始写一些博客主要是在看完代码后再温故总结一遍, 也是为了后面回头也能查阅。本系列会从官网的例子出发, 尽可能以链路追踪的方式记录其中源码核心模块的实现, 本篇例子来源

涉及的知识点

例子 signal/main.c

创建了两个子线程, 而 linux 提供的 sigaction 函数一个 signum 只能有一个监听函数, 那么多进程多线程如何做到只设置一次通知所有监听函数了?

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>

uv_loop_t* create_loop()
{
    uv_loop_t *loop = malloc(sizeof(uv_loop_t));
    if (loop) {
      uv_loop_init(loop);
    }
    return loop;
}

void signal_handler(uv_signal_t *handle, int signum)
{
    printf("Signal received: %d\n", signum);
    uv_signal_stop(handle);
}

// two signal handlers in one loop
void thread1_worker(void *userp)
{
    uv_loop_t *loop1 = create_loop();

    uv_signal_t sig1a, sig1b;
    uv_signal_init(loop1, &sig1a);
    uv_signal_start(&sig1a, signal_handler, SIGUSR1);

    uv_signal_init(loop1, &sig1b);
    uv_signal_start(&sig1b, signal_handler, SIGUSR1);

    uv_run(loop1, UV_RUN_DEFAULT);
}

// two signal handlers, each in its own loop
void thread2_worker(void *userp)
{
    uv_loop_t *loop2 = create_loop();
    uv_loop_t *loop3 = create_loop();

    uv_signal_t sig2;
    uv_signal_init(loop2, &sig2);
    uv_signal_start(&sig2, signal_handler, SIGUSR1);

    uv_signal_t sig3;
    uv_signal_init(loop3, &sig3);
    uv_signal_start(&sig3, signal_handler, SIGUSR1);

    while (uv_run(loop2, UV_RUN_NOWAIT) || uv_run(loop3, UV_RUN_NOWAIT)) {
    }
}

int main()
{
    printf("PID %d\n", getpid());

    uv_thread_t thread1, thread2;

    uv_thread_create(&thread1, thread1_worker, 0);
    uv_thread_create(&thread2, thread2_worker, 0);

    uv_thread_join(&thread1);
    uv_thread_join(&thread2);
    return 0;
}
复制代码

关于该例子中的 SIGUSR1 信号, 为用户自定义信号1

  • 比如向进程 12345 发送信号 10
$ kill -10 12345
复制代码
  • 那么向进程 12345 发送自定义信号1 则可以通过下面的命令
$ kill -SIGUSR1 12345
复制代码

uv_signal_init

thread1_worker > uv_signal_init

对 loop 和 handle 进行一些数据初始化操作, 主要调用了 uv__signal_loop_once_init 函数。

int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
  int err;

  err = uv__signal_loop_once_init(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
  handle->signum = 0;
  handle->caught_signals = 0;
  handle->dispatched_signals = 0;

  return 0;
}
复制代码

uv__signal_loop_once_init

thread1_worker > uv_signal_init > uv__signal_loop_once_init

  1. 如果已经有通信的 fd 则直接返回
  2. uv__make_pipe 函数在 【libuv 源码学习笔记】子进程与ipc 就分析过, 函数里面主要是调用 pipe2 函数

pipe2: 创建一个管道,一个单向的数据通道,可以 用于进程间通信。数组 pipefd 用于 返回两个指向管道末端的文件描述符。 pipefd[0] 指的是管道的读取端。 pipefd[1] 指的是 到管道的写端。写入到写端的数据 管道由内核缓冲,直到从 read 中读取 管道的末端。

  1. 调用 uv__io_init 初始化一个 i/o 观察者, 其观察者的回调函数为 uv__signal_event, 需要观察的 fd 为上面 pipe2 拿到读端的 fd。i/o 相关实现可参考 【libuv 源码学习笔记】线程池与i/o
  2. 调用 uv__io_start 注册刚才初始化完成的 i/o 观察者。
static int uv__signal_loop_once_init(uv_loop_t* loop) {
  int err;

  /* Return if already initialized. */
  if (loop->signal_pipefd[0] != -1)
    return 0;

  err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
  if (err)
    return err;

  uv__io_init(&loop->signal_io_watcher,
              uv__signal_event,
              loop->signal_pipefd[0]);
  uv__io_start(loop, &loop->signal_io_watcher, POLLIN);

  return 0;
}
复制代码

uv_signal_start

thread1_worker > uv_signal_start

uv_signal_start 函数里面主要是调用了 uv_signal_start 方法, libuv 中有大量相似度极高的函数名 …

  1. 如果发现该 handle 的 signum 已经注册则直接返回
  2. 调用 uv__signal_block_and_lock 就行类似互斥锁的锁定
  3. 调用 uv__signal_first_handle 函数, 如果该 signum 已经设置了监听函数则不再调用 uv__signal_register_handler 函数
  4. 调用 uv__signal_register_handler 函数给 signum 注册监听函数
  5. 通过 RB_INSERT 把该 handle 加入到树中。
  6. 调用 uv__signal_unlock_and_unblock 进行解锁, 即会 write 一次数据, 使其他等待的线程能够从 uv__signal_block_and_lock 函数往下运行。
static int uv__signal_start(uv_signal_t* handle,
                            uv_signal_cb signal_cb,
                            int signum,
                            int oneshot) {
 ...

  if (signum == handle->signum) {
    handle->signal_cb = signal_cb;
    return 0;
  }

  /* If the signal handler was already active, stop it first. */
  if (handle->signum != 0) {
    uv__signal_stop(handle);
  }

  uv__signal_block_and_lock(&saved_sigmask);

  first_handle = uv__signal_first_handle(signum);
  if (first_handle == NULL ||
      (!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT))) {
    err = uv__signal_register_handler(signum, oneshot);
    if (err) {
      /* Registering the signal handler failed. Must be an invalid signal. */
      uv__signal_unlock_and_unblock(&saved_sigmask);
      return err;
    }
  }

  handle->signum = signum;
  if (oneshot)
    handle->flags |= UV_SIGNAL_ONE_SHOT;

  RB_INSERT(uv__signal_tree_s, &uv__signal_tree, handle);

  uv__signal_unlock_and_unblock(&saved_sigmask);

  handle->signal_cb = signal_cb;
  uv__handle_start(handle);

  return 0;
}
复制代码

uv__signal_block_and_lock

thread1_worker > uv_signal_start > uv__signal_block_and_lock

  • sigfillset: 该函数的作用是将信号集初始化为空。
  • pthread_sigmask: 在多线程的程序里,希望只在主线程中处理信号,可以使用该函数。每个线程均有自己的信号屏蔽集(信号掩码),可以使用pthread_sigmask函数来屏蔽某个线程对某些信号的响应处理,仅留下需要处理该信号的线程来处理指定的信号。

通过 pthread_sigmask 的例子可以看见主要是对信号集进行了初始化的操作, 然后调用了 uv__signal_lock 函数。

//pthread_sigmask 的例子

sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGUSR1);
s = pthread_sigmask(SIG_BLOCK, &set, NULL);
复制代码
static void uv__signal_block_and_lock(sigset_t* saved_sigmask) {
  sigset_t new_mask;

  if (sigfillset(&new_mask))
    abort();

  /* to shut up valgrind */
  sigemptyset(saved_sigmask);
  if (pthread_sigmask(SIG_SETMASK, &new_mask, saved_sigmask))
    abort();

  if (uv__signal_lock())
    abort();
}
复制代码

uv__signal_lock

thread1_worker > uv_signal_start > uv__signal_block_and_lock > uv__signal_lock

通过 read 读取 uv__signal_lock_pipefd[0], 当出现 EINTR 出错时, 就会尝试轮询重试。EINTR 错误一般出现在当正在进行系统调用时, 此时发送了一个 signal。

如果在系统调用正在进行时发生信号,许多系统调用将报告 EINTR 错误代码。实际上没有发生错误,只是因为系统无法自动恢复系统调用而以这种方式报告。这种编码模式只是在发生这种情况时重试系统调用,以忽略中断。

static int uv__signal_lock(void) {
  int r;
  char data;

  do {
    r = read(uv__signal_lock_pipefd[0], &data, sizeof data);
  } while (r < 0 && errno == EINTR);

  return (r < 0) ? -1 : 0;
}
复制代码

那么何时 read 到数据让程序继续往下运行了 ?

此时感觉头绪有点断了, 那么从一开始在理一下, 是不是忽略了什么细节。最后在 create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init 函数中找到了 write 数据的地方。

uv__signal_global_init 分析

函数里面调用了如果 uv__signal_lock_pipefd 未设置, 则调用 pthread_atfork 函数

pthread_atfork 前两个参数是调用 fork 函数产生子进程时的before, after 父进程里面会运行的勾子函数, 第三个参数是子进程会运行的勾子函数。

原来是创建子进程时会调用 uv__signal_global_reinit 一次, 本例子是创建了线程故不会进入这个场景, 最后只运行了一次 uv__signal_global_reinit 函数。

static void uv__signal_global_init(void) {
  if (uv__signal_lock_pipefd[0] == -1)
    // https://man7.org/linux/man-pages/man3/pthread_atfork.3.html
    if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
      abort();

  uv__signal_global_reinit();
}
复制代码

uv__signal_global_reinit

create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init > uv__signal_global_reinit

原来是一个主线程里面会调用一次 uv__signal_global_reinit 函数, 去通过 uv__make_pipe 创建一个通信的管道, 并且最后会调用 uv__signal_unlock 去 write 一次数据。 这样在上面说到的当有一个线程进入 uv__signal_lock 逻辑时就会 read 到数据, 程序继续往下运行, 其他线程则会继续陷入等待, 达到互斥锁的目的。有点没想明白不直接使用互斥锁的原因 …

当pthread_mutex_lock()返回时,该互斥锁已被锁定。线程调用该函数让互斥锁上锁,如果该互斥锁已被另一个线程锁定和拥有,则调用该线程将阻塞,直到该互斥锁变为可用为止。

static void uv__signal_global_reinit(void) {
  uv__signal_cleanup();

  if (uv__make_pipe(uv__signal_lock_pipefd, 0))
    abort();

  if (uv__signal_unlock())
    abort();
}

static int uv__signal_unlock(void) {
  int r;
  char data = 42;

  do {
    r = write(uv__signal_lock_pipefd[1], &data, sizeof data);
  } while (r < 0 && errno == EINTR);

  return (r < 0) ? -1 : 0;
}
复制代码

uv__signal_first_handle

thread1_worker > uv_signal_start > uv__signal_first_handle

回到主线, 通过 RB_NFIND 查找该 signum 是否已经设置监听函数, 主要是确保一个 signum 只有一个监听函数。其主要原因是上面说的 sigaction 只能给一个 signum 绑定一个监听函数。

static uv_signal_t* uv__signal_first_handle(int signum) {
  /* This function must be called with the signal lock held. */
  uv_signal_t lookup;
  uv_signal_t* handle;

  lookup.signum = signum;
  lookup.flags = 0;
  lookup.loop = NULL;

  handle = RB_NFIND(uv__signal_tree_s, &uv__signal_tree, &lookup);

  if (handle != NULL && handle->signum == signum)
    return handle;

  return NULL;
}
复制代码

RB_NFIND

thread1_worker > uv_signal_start > uv__signal_first_handle > RB_NFIND

和 QUEUE 一样都是通过一组宏定义实现的, 代码在 deps/uv/include/uv/tree.h 文件中。

在这里 signum 都是数字形式, 通过红黑树结构能够高效的查找于遍历。

uv__signal_register_handler

thread1_worker > uv_signal_start > uv__signal_register_handler

设置该 signum 的信号处理函数为 uv__signal_handler

sa_flags:用来设置信号处理的其他相关操作,下列的数值可用。可用OR 运算(|)组合

  • A_NOCLDSTOP:如果参数signum为SIGCHLD,则当子进程暂停时并不会通知父进程
  • SA_ONESHOT/SA_RESETHAND:当调用新的信号处理函数前,将此信号处理方式改为系统预设的方式
  • SA_RESTART:被信号中断的系统调用会自行重启
  • SA_NOMASK/SA_NODEFER:在处理此信号未结束前不理会此信号的再次到来
static int uv__signal_register_handler(int signum, int oneshot) {
  /* When this function is called, the signal lock must be held. */
  struct sigaction sa;

  /* XXX use a separate signal stack? */
  memset(&sa, 0, sizeof(sa));
  if (sigfillset(&sa.sa_mask))
    abort();
  sa.sa_handler = uv__signal_handler;
  sa.sa_flags = SA_RESTART;
  if (oneshot)
    sa.sa_flags |= SA_RESETHAND;

  /* XXX save old action so we can restore it later on? */
  if (sigaction(signum, &sa, NULL))
    return UV__ERR(errno);

  return 0;
}
复制代码

uv__signal_handler

thread1_worker > uv_signal_start > uv__signal_register_handler > uv__signal_handler

作为唯一的信号处理函数, 让我们来看看 uv__signal_handler 的实现

  1. 通过 RB_NEXT 遍历拿出之前插入属性值 signum 等于当前接受到的信号 signum 的 handle。
  2. 在该 handle 的通信的 fd 写端写入数据。
  3. 剩下的就该知道发生啥事了, 在事件循环阶段五 Poll for I/O 阶段, epoll 等待写入事件成功后, 通知到上面通过 uv__io_init 设置的 i/o 观察者, 调用 i/o 观察者的回调函数, 即该例子的 uv__signal_event 函数。
static void uv__signal_handler(int signum) {
  ...

  for (handle = uv__signal_first_handle(signum);
       handle != NULL && handle->signum == signum;
       handle = RB_NEXT(uv__signal_tree_s, &uv__signal_tree, handle)) {
    int r;

    msg.signum = signum;
    msg.handle = handle;

    /* write() should be atomic for small data chunks, so the entire message
     * should be written at once. In theory the pipe could become full, in
     * which case the user is out of luck.
     */
    do {
      r = write(handle->loop->signal_pipefd[1], &msg, sizeof msg);
    } while (r == -1 && errno == EINTR);

    assert(r == sizeof msg ||
           (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)));

    if (r != -1)
      handle->caught_signals++;
  }

  uv__signal_unlock();
  errno = saved_errno;
}
复制代码

uv__signal_event

thread1_worker > uv_signal_init > uv__signal_loop_once_init > uv__signal_event

信号 i/o 设置的回调函数。

  1. 循环读取所有写入的消息, 可能有多条消息。
  2. 如果该消息的 signum 是需要监听的, 则调用 handle->signal_cb 回调函数。
static void uv__signal_event(uv_loop_t* loop,
                             uv__io_t* w,
                             unsigned int events) {
  uv__signal_msg_t* msg;
  uv_signal_t* handle;
  char buf[sizeof(uv__signal_msg_t) * 32];
  size_t bytes, end, i;
  int r;

  bytes = 0;
  end = 0;

  do {
    r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);

    if (r == -1 && errno == EINTR)
      continue;

    if (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
      /* If there are bytes in the buffer already (which really is extremely
       * unlikely if possible at all) we can't exit the function here. We'll
       * spin until more bytes are read instead.
       */
      if (bytes > 0)
        continue;

      /* Otherwise, there was nothing there. */
      return;
    }

    /* Other errors really should never happen. */
    if (r == -1)
      abort();

    bytes += r;

    /* `end` is rounded down to a multiple of sizeof(uv__signal_msg_t). */
    end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);

    for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
      msg = (uv__signal_msg_t*) (buf + i);
      handle = msg->handle;

      if (msg->signum == handle->signum) {
        assert(!(handle->flags & UV_HANDLE_CLOSING));
        handle->signal_cb(handle, handle->signum);
      }

      handle->dispatched_signals++;

      if (handle->flags & UV_SIGNAL_ONE_SHOT)
        uv__signal_stop(handle);
    }

    bytes -= end;

    /* If there are any "partial" messages left, move them to the start of the
     * the buffer, and spin. This should not happen.
     */
    if (bytes) {
      memmove(buf, buf + end, bytes);
      continue;
    }
  } while (end == sizeof buf);
}
复制代码

小结

只需在第一个调用 uv__signal_start 函数的时候注册一个信号处理函数, 当收到信号时, 该函数会遍历红黑树中所有关注该 signum 的 handle, 然后向该 handle 通过 pipe2 申请的通信 fd 的写端写入数据, 事件循环阶段被 epoll 捕获通知到该 handle 的 i/o 观察者, 最后调用观察者的回调, 达到通知所有监听函数的目的。

阅读更多: github.com/xiaoxiaojx/… 不定时会更新, 感兴趣的大佬可以点个 Star 支持, 谢谢阅读 ~

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享