了解NodeJs中的多线程

这是我参与8月更文挑战的第5天,活动详情查看:8月更文挑战

Worker Threads

node中的多线程是在node版本v10.5.0引入的一个的一个新特性,在很长一段时间内Worker Thread都是实验性质的,到目前为止,node稳定的版本已经到了14.17.4,这个特性已经变成稳定可用了。

worker_thread是什么

先来来看看官网的描述

The `worker_threads` module enables the use of threads that
execute JavaScript in parallel.
复制代码

意思就是:worker_thread模块允许使用线程来并行执行JavaScript。

我们以前Javascript都是单线程然后利用一个事件循环队列(event loop)不断监听执行栈是否有函数进入。对于worker_thread,其实可以理解为一个event loop中有多个javascript工作线程,创建一个线程相当于创建一个新的js执行环境。多线程的运行如下图

image.png

与child_process的区别

child_process是可以创建一个新的node进程,worker_thread与它的最大区别就是:worker_thread可以共享内存,公共的数据可以在线程之间公用,而child_process只能通过JSON去传递数据。

还有就是:因为线程是在一个进程内的,创建一个线程的开销会比创建一个进程要

worker_thread的适用范围

worker_threadCPU密集型的JS操作中非常有用,但是在IO密集型操作中性能不会有太多的改善,反而Node自带的一步IO操作会比工作线程更有用。

使用worker_thread

介绍完worker_thread的概念,现在来介绍一下他的用法

创建工作线程

const { isMainThread, Worker, parentPort } = require('worker_threads');

if (isMainThread) {
  createChildThread();
} else {
  parentPort.on('message', (msg) => {
    console.log('parent thread listen:', msg);
  })
  parentPort.postMessage('hello child thread')
  
}

// 创建线程方法
function createChildThread() {
  const worker = new Worker(__filename)
  worker.on('message', (val) => {
    console.log('child thread listen:', val);
    worker.postMessage('hello parent thread');
  })
}
复制代码

上面代码在主线程的时候调用创建工作线程的方法,创建一个工作线程并且新增一个,而创建工作线程后调用主线程的端口向工作线程发送消息,工作线程接受到消息后再向主线程回应。大概的流程图如下

image.png

执行后返回

创建线程前
image.png

创建线程后,线程数量+1
image.png

image.png

线程间通信

上文提到,worker_threads可以通过ArrayBufferSharedArrayBuffer共享内存。接下来看一下,它在代码中是如何实现的.

const { isMainThread, Worker, parentPort } = require('worker_threads');

if (isMainThread) {
  createChildThread();
} else {
  // 创建一个长度为4byte的SharedArrayBuffer
  const shareBuf = new SharedArrayBuffer(4);
  // 创建一个8位无符号整型数组
  const bufInt = new Uint8Array(shareBuf);
  parentPort.on('message', () => {
    console.log('parent thread listen:', bufInt);
  });
  // 发送共享内存创建的整型
  parentPort.postMessage({ bufInt });
}

// 创建线程方法
function createChildThread() {
  const worker = new Worker(__filename);
  worker.on('message', ({ bufInt }) => {
    console.log('child thread listen:', bufInt);
    bufInt[0] = 11;
    worker.postMessage('finished');
  });
}
复制代码

运行后返回,可以看到在子线程创建一个SharedArrayBuffer,用主线程广播的一个数据,在子线程中接收后赋值,因为是线程间共享的Buffer,所以主线程这边也可以看到在子线程中修改的数据。
image.png

如图所示,使用SharedArrayBuffer创建的值会分配到共享内存中,所有线程都可以共用这块内存。
image.png

线程间通信

我们已经学会创建线程使用共享内存了,从上面代码可以看到,线程都是从主线程中发送消息,然后子线程向主线程回复消息,没有办法让两个子线程直接直接通信,如果我想让两个子线程直接通信,那就需要用到MessageChannel这个类了,MessageChannel的具体用法可以点击这里。现在就来实现一下子线程之间的直接通信。

const {
  isMainThread,
  Worker,
  parentPort,
  MessageChannel,
  threadId,
} = require('worker_threads');

if (isMainThread) {
  createChildThread();
} else {
  parentPort.on('message', ({ port }) => {
    // 主线程接收到端口后配置通信端口方法
    port.on('message', (msg) => {
      console.log(`port${threadId} listen:`, msg);
    });
    port.postMessage(`hello, im thread ${threadId}`);
  });
  parentPort.postMessage('hello');
}

// 创建线程方法
function createChildThread() {
  const { port1, port2 } = new MessageChannel(); // 创建一个MessageChannel
  const worker1 = new Worker(__filename); // 创建子线程1
  const worker2 = new Worker(__filename); // 创建子线程2
  worker2.postMessage({ port: port1 }, [port1]); // 向主线程发送Channel的端口1
  worker1.postMessage({ port: port2 }, [port2]); // 向主线程发送Channel的端口1
}
复制代码

运行代码后返回
image.png

从代码实现可以看到,最终建立子线程直接通信的步骤还是在主线程的message事件中。建立通信的流程图如下

image.png

实战

了解完线程的概念和用法,现在来实战一下:比如在数组中有100万条数据需要md5加密,对比一下使用工作线程和不使用工作线程的实现速度怎么样

const {
  isMainThread,
  Worker,
  parentPort,
  threadId,
} = require('worker_threads');
const { createHash } = require('crypto');

const ARR_NUM = 1000000; // 数组长度
const WORKER_NUM = 1; // 线程数
const size = Math.ceil(ARR_NUM / WORKER_NUM); // 每个线程需要处理的数据量

if (isMainThread) {
  createChildThread();
} else {
  parentPort.on('message', ({ status, index, startTime }) => {
    if (index === WORKER_NUM) {
      const usedTime = Date.now() - startTime;
      console.log(`finish bussiness time: ${usedTime}ms`);
      process.exit(threadId);
    }
  });
  const data = addHasCode(threadId, size, (threadId - 1) * size);
  // 完成后
  parentPort.postMessage({
    business: 'finish work',
    data,
  });
}

// 创建线程方法
function createChildThread() {
  let finishNumBuf = new SharedArrayBuffer(4);
  let finishNum = new Uint8Array(finishNumBuf);
  const startTime = Date.now();
  for (let x = 0; x < WORKER_NUM; x++) {
    const worker = new Worker(__filename, {});
    worker.on('message', ({ business, data }) => {
      if (business === 'finish work') {
        finishNum[0]++;
        worker.postMessage({
          status: `finish worker ${x}`,
          index: finishNum[0],
          startTime: startTime,
          data,
        });
      }
    });
  }

  console.log(`${WORKER_NUM} thread start working`);
}

// 加密方法
function addHasCode(index, size, limit) {
  const result = [];
  for (let x = limit, num = index * size; x < num; x++) {
    result.push(createHash('md5').update('hello world').digest('base64'));
  }
  return result;
}
复制代码

使用1个线程计算,平均需要2700ms左右
image.png

使用5个线程计算,平均需要2000ms左右
image.png

使用20个线程计算,平均需要2500ms左右
image.png

使用60个线程计算,平均需要3800ms左右
image.png

可见,线程不是越多越好,过多的线程可能会增加过多的系统开销,速度也不如单线程时候运行。

小结

本文介绍了nodeJs中的worker_threads的概念,去多进程的区别,和它的优点。
介绍了worker_threads是如何使用,共享内存,还有子线程之间的通信。
最后用一个测试子进程的效率的例子说明worker_threads对比单线程运行。

若文章中有不严谨或出错的地方请在评论区域指出~

参考

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享