。
前言
开始写一些博客主要是在看完代码后再温故总结一遍, 也是为了后面回头也能查阅。本系列会从官网的例子出发, 尽可能以链路追踪的方式记录其中源码核心模块的实现, 本篇例子来源
涉及的知识点
- libuv 中信号的实现
- Unix信号 信号列表
- 红黑树 在信号中的使用
- sigaction(2) — Linux manual page
- Checking if errno != EINTR: what does it mean?
- pipe(2) — Linux manual page
- pthread_atfork(3) — Linux manual page
- pthread_sigmask(3) — Linux manual page
例子 signal/main.c
创建了两个子线程, 而 linux 提供的 sigaction 函数一个 signum 只能有一个监听函数, 那么多进程多线程如何做到只设置一次通知所有监听函数了?
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>
uv_loop_t* create_loop()
{
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
if (loop) {
uv_loop_init(loop);
}
return loop;
}
void signal_handler(uv_signal_t *handle, int signum)
{
printf("Signal received: %d\n", signum);
uv_signal_stop(handle);
}
// two signal handlers in one loop
void thread1_worker(void *userp)
{
uv_loop_t *loop1 = create_loop();
uv_signal_t sig1a, sig1b;
uv_signal_init(loop1, &sig1a);
uv_signal_start(&sig1a, signal_handler, SIGUSR1);
uv_signal_init(loop1, &sig1b);
uv_signal_start(&sig1b, signal_handler, SIGUSR1);
uv_run(loop1, UV_RUN_DEFAULT);
}
// two signal handlers, each in its own loop
void thread2_worker(void *userp)
{
uv_loop_t *loop2 = create_loop();
uv_loop_t *loop3 = create_loop();
uv_signal_t sig2;
uv_signal_init(loop2, &sig2);
uv_signal_start(&sig2, signal_handler, SIGUSR1);
uv_signal_t sig3;
uv_signal_init(loop3, &sig3);
uv_signal_start(&sig3, signal_handler, SIGUSR1);
while (uv_run(loop2, UV_RUN_NOWAIT) || uv_run(loop3, UV_RUN_NOWAIT)) {
}
}
int main()
{
printf("PID %d\n", getpid());
uv_thread_t thread1, thread2;
uv_thread_create(&thread1, thread1_worker, 0);
uv_thread_create(&thread2, thread2_worker, 0);
uv_thread_join(&thread1);
uv_thread_join(&thread2);
return 0;
}
复制代码
关于该例子中的 SIGUSR1 信号, 为用户自定义信号1
- 比如向进程 12345 发送信号 10
$ kill -10 12345
复制代码
- 那么向进程 12345 发送自定义信号1 则可以通过下面的命令
$ kill -SIGUSR1 12345
复制代码
uv_signal_init
thread1_worker > uv_signal_init
对 loop 和 handle 进行一些数据初始化操作, 主要调用了 uv__signal_loop_once_init 函数。
int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
int err;
err = uv__signal_loop_once_init(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
handle->signum = 0;
handle->caught_signals = 0;
handle->dispatched_signals = 0;
return 0;
}
复制代码
uv__signal_loop_once_init
thread1_worker > uv_signal_init > uv__signal_loop_once_init
- 如果已经有通信的 fd 则直接返回
- uv__make_pipe 函数在 【libuv 源码学习笔记】子进程与ipc 就分析过, 函数里面主要是调用 pipe2 函数
pipe2: 创建一个管道,一个单向的数据通道,可以 用于进程间通信。数组 pipefd 用于 返回两个指向管道末端的文件描述符。 pipefd[0] 指的是管道的读取端。 pipefd[1] 指的是 到管道的写端。写入到写端的数据 管道由内核缓冲,直到从 read 中读取 管道的末端。
- 调用 uv__io_init 初始化一个 i/o 观察者, 其观察者的回调函数为 uv__signal_event, 需要观察的 fd 为上面 pipe2 拿到读端的 fd。i/o 相关实现可参考 【libuv 源码学习笔记】线程池与i/o
- 调用 uv__io_start 注册刚才初始化完成的 i/o 观察者。
static int uv__signal_loop_once_init(uv_loop_t* loop) {
int err;
/* Return if already initialized. */
if (loop->signal_pipefd[0] != -1)
return 0;
err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
if (err)
return err;
uv__io_init(&loop->signal_io_watcher,
uv__signal_event,
loop->signal_pipefd[0]);
uv__io_start(loop, &loop->signal_io_watcher, POLLIN);
return 0;
}
复制代码
uv_signal_start
thread1_worker > uv_signal_start
uv_signal_start 函数里面主要是调用了 uv_signal_start 方法, libuv 中有大量相似度极高的函数名 …
- 如果发现该 handle 的 signum 已经注册则直接返回
- 调用 uv__signal_block_and_lock 就行类似互斥锁的锁定
- 调用 uv__signal_first_handle 函数, 如果该 signum 已经设置了监听函数则不再调用 uv__signal_register_handler 函数
- 调用 uv__signal_register_handler 函数给 signum 注册监听函数
- 通过 RB_INSERT 把该 handle 加入到树中。
- 调用 uv__signal_unlock_and_unblock 进行解锁, 即会 write 一次数据, 使其他等待的线程能够从 uv__signal_block_and_lock 函数往下运行。
static int uv__signal_start(uv_signal_t* handle,
uv_signal_cb signal_cb,
int signum,
int oneshot) {
...
if (signum == handle->signum) {
handle->signal_cb = signal_cb;
return 0;
}
/* If the signal handler was already active, stop it first. */
if (handle->signum != 0) {
uv__signal_stop(handle);
}
uv__signal_block_and_lock(&saved_sigmask);
first_handle = uv__signal_first_handle(signum);
if (first_handle == NULL ||
(!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT))) {
err = uv__signal_register_handler(signum, oneshot);
if (err) {
/* Registering the signal handler failed. Must be an invalid signal. */
uv__signal_unlock_and_unblock(&saved_sigmask);
return err;
}
}
handle->signum = signum;
if (oneshot)
handle->flags |= UV_SIGNAL_ONE_SHOT;
RB_INSERT(uv__signal_tree_s, &uv__signal_tree, handle);
uv__signal_unlock_and_unblock(&saved_sigmask);
handle->signal_cb = signal_cb;
uv__handle_start(handle);
return 0;
}
复制代码
uv__signal_block_and_lock
thread1_worker > uv_signal_start > uv__signal_block_and_lock
- sigfillset: 该函数的作用是将信号集初始化为空。
- pthread_sigmask: 在多线程的程序里,希望只在主线程中处理信号,可以使用该函数。每个线程均有自己的信号屏蔽集(信号掩码),可以使用pthread_sigmask函数来屏蔽某个线程对某些信号的响应处理,仅留下需要处理该信号的线程来处理指定的信号。
通过 pthread_sigmask 的例子可以看见主要是对信号集进行了初始化的操作, 然后调用了 uv__signal_lock 函数。
//pthread_sigmask 的例子
sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGUSR1);
s = pthread_sigmask(SIG_BLOCK, &set, NULL);
复制代码
static void uv__signal_block_and_lock(sigset_t* saved_sigmask) {
sigset_t new_mask;
if (sigfillset(&new_mask))
abort();
/* to shut up valgrind */
sigemptyset(saved_sigmask);
if (pthread_sigmask(SIG_SETMASK, &new_mask, saved_sigmask))
abort();
if (uv__signal_lock())
abort();
}
复制代码
uv__signal_lock
thread1_worker > uv_signal_start > uv__signal_block_and_lock > uv__signal_lock
通过 read 读取 uv__signal_lock_pipefd[0], 当出现 EINTR 出错时, 就会尝试轮询重试。EINTR 错误一般出现在当正在进行系统调用时, 此时发送了一个 signal。
如果在系统调用正在进行时发生信号,许多系统调用将报告 EINTR 错误代码。实际上没有发生错误,只是因为系统无法自动恢复系统调用而以这种方式报告。这种编码模式只是在发生这种情况时重试系统调用,以忽略中断。
static int uv__signal_lock(void) {
int r;
char data;
do {
r = read(uv__signal_lock_pipefd[0], &data, sizeof data);
} while (r < 0 && errno == EINTR);
return (r < 0) ? -1 : 0;
}
复制代码
那么何时 read 到数据让程序继续往下运行了 ?
此时感觉头绪有点断了, 那么从一开始在理一下, 是不是忽略了什么细节。最后在 create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init 函数中找到了 write 数据的地方。
uv__signal_global_init 分析
函数里面调用了如果 uv__signal_lock_pipefd 未设置, 则调用 pthread_atfork 函数
pthread_atfork 前两个参数是调用 fork 函数产生子进程时的before, after 父进程里面会运行的勾子函数, 第三个参数是子进程会运行的勾子函数。
原来是创建子进程时会调用 uv__signal_global_reinit 一次, 本例子是创建了线程故不会进入这个场景, 最后只运行了一次 uv__signal_global_reinit 函数。
static void uv__signal_global_init(void) {
if (uv__signal_lock_pipefd[0] == -1)
// https://man7.org/linux/man-pages/man3/pthread_atfork.3.html
if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
abort();
uv__signal_global_reinit();
}
复制代码
uv__signal_global_reinit
create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init > uv__signal_global_reinit
原来是一个主线程里面会调用一次 uv__signal_global_reinit 函数, 去通过 uv__make_pipe 创建一个通信的管道, 并且最后会调用 uv__signal_unlock 去 write 一次数据。 这样在上面说到的当有一个线程进入 uv__signal_lock 逻辑时就会 read 到数据, 程序继续往下运行, 其他线程则会继续陷入等待, 达到互斥锁的目的。有点没想明白不直接使用互斥锁的原因 …
当pthread_mutex_lock()返回时,该互斥锁已被锁定。线程调用该函数让互斥锁上锁,如果该互斥锁已被另一个线程锁定和拥有,则调用该线程将阻塞,直到该互斥锁变为可用为止。
static void uv__signal_global_reinit(void) {
uv__signal_cleanup();
if (uv__make_pipe(uv__signal_lock_pipefd, 0))
abort();
if (uv__signal_unlock())
abort();
}
static int uv__signal_unlock(void) {
int r;
char data = 42;
do {
r = write(uv__signal_lock_pipefd[1], &data, sizeof data);
} while (r < 0 && errno == EINTR);
return (r < 0) ? -1 : 0;
}
复制代码
uv__signal_first_handle
thread1_worker > uv_signal_start > uv__signal_first_handle
回到主线, 通过 RB_NFIND 查找该 signum 是否已经设置监听函数, 主要是确保一个 signum 只有一个监听函数。其主要原因是上面说的 sigaction 只能给一个 signum 绑定一个监听函数。
static uv_signal_t* uv__signal_first_handle(int signum) {
/* This function must be called with the signal lock held. */
uv_signal_t lookup;
uv_signal_t* handle;
lookup.signum = signum;
lookup.flags = 0;
lookup.loop = NULL;
handle = RB_NFIND(uv__signal_tree_s, &uv__signal_tree, &lookup);
if (handle != NULL && handle->signum == signum)
return handle;
return NULL;
}
复制代码
RB_NFIND
thread1_worker > uv_signal_start > uv__signal_first_handle > RB_NFIND
和 QUEUE 一样都是通过一组宏定义实现的, 代码在 deps/uv/include/uv/tree.h 文件中。
在这里 signum 都是数字形式, 通过红黑树结构能够高效的查找于遍历。
uv__signal_register_handler
thread1_worker > uv_signal_start > uv__signal_register_handler
设置该 signum 的信号处理函数为 uv__signal_handler
sa_flags:用来设置信号处理的其他相关操作,下列的数值可用。可用OR 运算(|)组合
- A_NOCLDSTOP:如果参数signum为SIGCHLD,则当子进程暂停时并不会通知父进程
- SA_ONESHOT/SA_RESETHAND:当调用新的信号处理函数前,将此信号处理方式改为系统预设的方式
- SA_RESTART:被信号中断的系统调用会自行重启
- SA_NOMASK/SA_NODEFER:在处理此信号未结束前不理会此信号的再次到来
static int uv__signal_register_handler(int signum, int oneshot) {
/* When this function is called, the signal lock must be held. */
struct sigaction sa;
/* XXX use a separate signal stack? */
memset(&sa, 0, sizeof(sa));
if (sigfillset(&sa.sa_mask))
abort();
sa.sa_handler = uv__signal_handler;
sa.sa_flags = SA_RESTART;
if (oneshot)
sa.sa_flags |= SA_RESETHAND;
/* XXX save old action so we can restore it later on? */
if (sigaction(signum, &sa, NULL))
return UV__ERR(errno);
return 0;
}
复制代码
uv__signal_handler
thread1_worker > uv_signal_start > uv__signal_register_handler > uv__signal_handler
作为唯一的信号处理函数, 让我们来看看 uv__signal_handler 的实现
- 通过 RB_NEXT 遍历拿出之前插入属性值 signum 等于当前接受到的信号 signum 的 handle。
- 在该 handle 的通信的 fd 写端写入数据。
- 剩下的就该知道发生啥事了, 在事件循环阶段五 Poll for I/O 阶段, epoll 等待写入事件成功后, 通知到上面通过 uv__io_init 设置的 i/o 观察者, 调用 i/o 观察者的回调函数, 即该例子的 uv__signal_event 函数。
static void uv__signal_handler(int signum) {
...
for (handle = uv__signal_first_handle(signum);
handle != NULL && handle->signum == signum;
handle = RB_NEXT(uv__signal_tree_s, &uv__signal_tree, handle)) {
int r;
msg.signum = signum;
msg.handle = handle;
/* write() should be atomic for small data chunks, so the entire message
* should be written at once. In theory the pipe could become full, in
* which case the user is out of luck.
*/
do {
r = write(handle->loop->signal_pipefd[1], &msg, sizeof msg);
} while (r == -1 && errno == EINTR);
assert(r == sizeof msg ||
(r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)));
if (r != -1)
handle->caught_signals++;
}
uv__signal_unlock();
errno = saved_errno;
}
复制代码
uv__signal_event
thread1_worker > uv_signal_init > uv__signal_loop_once_init > uv__signal_event
信号 i/o 设置的回调函数。
- 循环读取所有写入的消息, 可能有多条消息。
- 如果该消息的 signum 是需要监听的, 则调用 handle->signal_cb 回调函数。
static void uv__signal_event(uv_loop_t* loop,
uv__io_t* w,
unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
size_t bytes, end, i;
int r;
bytes = 0;
end = 0;
do {
r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);
if (r == -1 && errno == EINTR)
continue;
if (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
/* If there are bytes in the buffer already (which really is extremely
* unlikely if possible at all) we can't exit the function here. We'll
* spin until more bytes are read instead.
*/
if (bytes > 0)
continue;
/* Otherwise, there was nothing there. */
return;
}
/* Other errors really should never happen. */
if (r == -1)
abort();
bytes += r;
/* `end` is rounded down to a multiple of sizeof(uv__signal_msg_t). */
end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);
for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
msg = (uv__signal_msg_t*) (buf + i);
handle = msg->handle;
if (msg->signum == handle->signum) {
assert(!(handle->flags & UV_HANDLE_CLOSING));
handle->signal_cb(handle, handle->signum);
}
handle->dispatched_signals++;
if (handle->flags & UV_SIGNAL_ONE_SHOT)
uv__signal_stop(handle);
}
bytes -= end;
/* If there are any "partial" messages left, move them to the start of the
* the buffer, and spin. This should not happen.
*/
if (bytes) {
memmove(buf, buf + end, bytes);
continue;
}
} while (end == sizeof buf);
}
复制代码
小结
只需在第一个调用 uv__signal_start 函数的时候注册一个信号处理函数, 当收到信号时, 该函数会遍历红黑树中所有关注该 signum 的 handle, 然后向该 handle 通过 pipe2 申请的通信 fd 的写端写入数据, 事件循环阶段被 epoll 捕获通知到该 handle 的 i/o 观察者, 最后调用观察者的回调, 达到通知所有监听函数的目的。
阅读更多: github.com/xiaoxiaojx/… 不定时会更新, 感兴趣的大佬可以点个 Star 支持, 谢谢阅读 ~