JavaScript 如何运行计算密集型任务
这篇文章,我打算从 js 有哪些实现计算密集型任务的手段出发,延伸到进程和线程,再到 golang, 从而对并发有一个深刻的认知。
需求是这样的,一个 Nodejs web 后端,需要执行一个数据处理步骤,在内存中进行,非常耗时。项目用 pm2 起了四个服务,用 pm2 的观测命令可以看到,每次场景会把一个服务的 cpu 占满,运行四次就会把四个服务都占满,后面整个服务就瘫痪了,无法处理新的请求。
我们都知道,js 是基于事件循环机制来实现非堵塞 IO 效果的,且 js 是单线程运行。这意味着,JS 一个时刻只能处理一件事。如果一个任务占据了 js 线程过长时间,js 就不能处理后续的请求,表现就是服务挂掉了。因此,cpu 密集任务在 js 中是一个致命问题。
worker threads
和 child process
既然这一切是单线程的锅,那么如果有新的线程或者进程来处理这个 cpu 密集任务,把主 js 线程解放出来,这样服务就能正常处理请求了。所以,引入 js 中两个处理并发计算的工具:worker threads
和child process
。顾名思义,前者是新线程,后者是新进程。
这里顺便复习一下线程和进程的关系。就记住一句话,进程大于线程,进程之间互相隔离,线程之间共享内存。一个 Nodejs 进程,默认只有一个主线程,不过也有别的线程,比如 I/O 线程,还有我们即将谈到的工作线程(worker threads
)。
废话少说,直接上代码。这里我们定义一个朴实无华的 cpu 密集任务,即 1e10 级别的累加任务:
// CPU 密集型任务:计算 1 到 N 的总和
function heavyComputation(n) {
let sum = 0;
for (let i = 1; i <= n; i++) {
sum += 1;
}
return sum;
}
worker threads
worker
的代码如下:
// worker.js
const { parentPort, workerData } = require("worker_threads");
// CPU 密集型任务:计算 1 到 N 的总和
function heavyComputation(n) {
let sum = 0;
for (let i = 1; i <= n; i++) {
sum += 1;
}
return sum;
}
// 计算并发送结果回主线程
const result = heavyComputation(workerData);
parentPort.postMessage(result);
// -----------------
// main.js
const { Worker } = require("worker_threads");
function runWorker(workerData) {
return new Promise((resolve, reject) => {
const worker = new Worker("./worker.js", { workerData });
worker.on("message", resolve); // 接收子线程的结果
worker.on("error", reject); // 子线程发生错误
worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
// 调用 worker 执行 CPU 密集型任务
(async () => {
try {
console.time("1");
console.log("Main thread: Starting CPU-intensive task...");
const result = await runWorker(1e10); // 计算 1 到 100000000 的总和
console.log(`Main thread: Task result is ${result}`);
console.timeEnd("1");
} catch (error) {
console.error("Error:", error);
}
})();
console.log("other task");
执行结果:
Main thread: Starting CPU-intensive task...
other task
Main thread: Task result is 10000000000
1: 11.216s
注意上面这种写法是把 worker 包进一个 promise 里面,这样 js 就把 runWorker 作为一个异步事件处理,并放入事件队列。当 resolve 被调用时,说明 worker 执行完毕,js 就会把回调事件 resolve 从任务队列中取出执行。在上面的代码中,worker.on("message", resolve)
等同于worker.on("message", (result)=>{resolve(result)})
。
回到 worker,它就是启动一个线程用于执行 cpu 密集任务,从而将主线程解放出来。
child process
// compute.js
process.on("message", (n) => {
// CPU 密集型任务:计算 1 到 N 的总和
function heavyComputation(n) {
let sum = 0;
for (let i = 1; i <= n; i++) {
sum += 1;
}
return sum;
}
const result = heavyComputation(n);
process.send(result); // 将结果发送回主进程
});
// ------------------------
// main.js
const { fork } = require("child_process");
// 创建子进程
const computeProcess = fork("./compute.js");
// 监听子进程消息
computeProcess.on("message", (result) => {
console.log(`Main process: Task result is ${result}`);
console.timeEnd("1");
computeProcess.kill(); // 任务完成后杀死子进程
});
// 发送任务数据到子进程
console.time("1");
console.log("Main process: Starting CPU-intensive task...");
computeProcess.send(1e10);
console.log("other task");
打印结果:
Main process: Starting CPU-intensive task...
other task
Main process: Task result is 10000000000
1: 11.116s
可以看出,两者执行事件大致相同。
两者比较
进程由于其相互间的隔离性较好,所以适合在执行外部程序或脚本时使用,或者需要 standard input/output 作为消息传递的场景。
线程则因其上下文切换开销小,故而是 cpu 密集任务的首选。