开发环境:Node.JS v14.8.0
整理自之前的《NodeJS多线程通信和共享内存 - worker_threads》&&《NodeJS多线程模块研究 - worker_threads》两篇文章。
快速开始
js 和 nodejs 一直都是单线程,直到官方推出了 worker_threads 模块,用来解决 CPU 密集型计算场景。
可以通过以下代码快速开启一个工作线程:
1 2 3 4 5 6 7 8 9 10 11 12 13
| if (isMainThread) { new Worker(__filename); console.log("在主进程中"); console.log("isMainThread is", isMainThread); } else { console.log("在工作线程中"); console.log("isMainThread is", isMainThread); }
|
上面代码的输出是:
1 2 3 4
| 在主进程中 isMainThread is true 在工作线程中 isMainThread is false
|
多线程通信
父子线程通信-parentPort
master thread 中,Worker
返回的对象,代表着工作线程实例。
worker thread 中,require('worker_threads').parentPort
就是 master thread 的 MessagePort。
利用上面说到的两个对象,可以实现主线程和工作线程之间的“双工通信”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| const { Worker, isMainThread, parentPort } = require("worker_threads");
if (isMainThread) { const worker = new Worker(__filename); worker.on("message", (message) => console.log(message)); worker.postMessage("ping"); } else { parentPort.on("message", (message) => parentPort.postMessage({ pong: message }) ); }
|
上面输出:
1 2 3 4
| { pong: "ping"; } // handling
|
node 的设计参考了浏览器的 Web Worker
任意线程通信-MessageChannel
通信管道(MessageChannel)可以跨任何线程(比如多个工作线程之间)进行数据通信。对比上面通过require('worker_threads').parentPort
的方式,缺点是只能在主线程和工作线程之间进行通信。
管道通信的思路:
- 通过
MessageChannel
创建通信管道
- 通过
postMessage
传递数据
- 第一个参数是传递的数据,包括通信管道的
Messageport
- 第二个参数是
transformList
,放入其中的对象,将在管道发送端中无法使用
来看一段利用通信管道,兄弟线程通信的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| const { isMainThread, parentPort, threadId, MessageChannel, Worker } = require('worker_threads');
if (isMainThread) { const worker1 = new Worker(__filename); const worker2 = new Worker(__filename); const subChannel = new MessageChannel(); worker1.postMessage({ yourPort: subChannel.port1 }, [subChannel.port1]); worker2.postMessage({ yourPort: subChannel.port2 }, [subChannel.port2]); } else { parentPort.once('message', (value) => { value.yourPort.postMessage('hello'); value.yourPort.on('message', msg => { console.log(`thread ${threadId}: receive ${msg}`); }); }); }
|
控制台输出:
1 2
| thread 1: receive hello thread 2: receive hell
|
共享内存通信-SharedArrayBuffer
nodejs 多线程也可以通过“共享内存”进行通信。master thread 和 worker threads 都操作同一段物理存储上的数据,实现数据共享,并且避免了拷贝带来的开销。
共享内存通信的思路:
- 通过
SharedArrayBuffer
创建二进制数据
SharedArrayBuffer
数据不能放在 postMessage
的第二个参数中
来看一段通过共享内存进行通信的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| const assert = require("assert"); const { Worker, MessageChannel, MessagePort, isMainThread, parentPort, } = require("worker_threads"); if (isMainThread) { const worker = new Worker(__filename); const subChannel = new MessageChannel(); const sharedArray = new SharedArrayBuffer(4); const uint8Arr = new Uint8Array(sharedArray); console.log("[master] 原来的uint8Arr", uint8Arr);
worker.postMessage({ hereIsYourPort: subChannel.port1, uint8Arr }, [ subChannel.port1, ]);
subChannel.port2.on("message", (msg) => { console.log("[master] 经过工作线程修改的uint8Arr", uint8Arr); }); } else { parentPort.on("message", (value) => { assert(value.hereIsYourPort instanceof MessagePort); value.uint8Arr[1] = 10; console.log("[worker] 修改出来的uint8Arr", value.uint8Arr); value.hereIsYourPort.postMessage(""); value.hereIsYourPort.close(); }); }
|
上面代码的输出是:
1 2 3 4
| [master] 原来的uint8Arr Uint8Array(4) [ 0, 0, 0, 0 ] [worker] 修改出来的uint8Arr Uint8Array(4) [ 0, 10, 0, 0 ] [master] 经过工作线程修改的uint8Arr Uint8Array(4) [ 0, 10, 0, 0 ]
|
可以看到,worker thread 修改了 uint8Arr,由于采用共享内存,因此在 master thread 中,从自己的上下文中,也读取到修改后的 uint8Arr。
worker_threads 效果评测
单线程计算
以一个生成素数的计算为例,直接跑2-1e7范围内的素数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| const min = 2; const max = 1e7; const primes = []; function generatePrimes(start, range) { let isPrime = true; let end = start + range; for (let i = start; i < end; i++) { for (let j = min; j < Math.sqrt(end); j++) { if (i !== j && i%j === 0) { isPrime = false; break; } } if (isPrime) { primes.push(i); } isPrime = true; } } generatePrimes(min, max);
|
通过 time
命令能看到,这里一共耗时10.36s,单核cpu占用率达到了99%
工作线程计算
主线程负责切割数据,将数据等分后,下发给工作线程。
工作线程从 workerData 上,读取计算范围,同时计算范围内的素数。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const min = 2; let primes = []; function generatePrimes(start, range) { let isPrime = true; let end = start + range; for (let i = start; i < end; i++) { for (let j = min; j < Math.sqrt(end); j++) { if (i !== j && i%j === 0) { isPrime = false; break; } } if (isPrime) { primes.push(i); } isPrime = true; } } if (isMainThread) { const max = 1e7; const threadCount = +process.argv[2] || 4; const threads = new Set();; console.log(`Running with ${threadCount} threads...`); const range = Math.ceil((max - min) / threadCount); let start = min; for (let i = 0; i < threadCount - 1; i++) { const myStart = start; threads.add(new Worker(__filename, { workerData: { start: myStart, range }})); start += range; } threads.add(new Worker(__filename, { workerData: { start, range: range + ((max - min + 1) % threadCount)}})); for (let worker of threads) { worker.on('error', (err) => { throw err; }); worker.on('exit', () => { threads.delete(worker); console.log(`Thread exiting, ${threads.size} running...`); if (threads.size === 0) { } }) worker.on('message', (msg) => { primes = primes.concat(msg); }); } } else { generatePrimes(workerData.start, workerData.range); parentPort.postMessage(primes); }
|
在本地开启4个工作线程,并且本地6核CPU不处于忙碌的状态,最终耗时是2.658s,缩短了4倍。CPU利用率是 314%,,调用了3+CPU资源来支持本次计算:
可以看到,优化效果非常明显,能充分利用多核CPU的优势。
worker_threads 的底层模型
对于单线程,Nodejs 是由以下部分组成:
- 一个进程
- 一个线程
- 一个事件循环
- 一个 JS 引擎实例
- 一个 Node.js 实例
对于工作线程,Nodejs 组成变成了:
- 一个进程
- 多个线程
- 每个线程独立的事件循环
- 每个线程独立的 JS 引擎实例
- 每个线程独立的 Node.js 实例
如下图所示:
Q & A
多线程是否可以并行?workder_threads 能否提高CPU利用率?
并发是快速切换任务,本质还是串行执行;并行是同时执行任务。
而多线程可以并发,也可以并行,这取决于是否能抢占到 CPU 资源。这个过程是操作系统来调度,不可人为控制。
但总的来说,多线程和多进程类似,都可以 CPU 的利用率。
workder_threads和cluster/child_process的区别
child_process是nodejs的多进程模块。cluster是在child_process上封装的集群模块,提供了多进程并行下的各种API。他们都是基于进程模型。
worker_threads 基于线程模型,开销小,更加轻量,并且在同一个进程下,还可以通过共享内存来通信。适合解决CPU密集问题。
参考文章