Search K
Appearance
Appearance
📊 SEO元描述:2024年最新Node.js Worker Threads教程,详解多线程编程、主线程工作线程通信、共享内存。包含完整CPU密集型任务处理和线程池管理,适合高级开发者掌握并行计算。
核心关键词:Node.js Worker Threads2024、多线程编程、并行计算、线程通信、共享内存、CPU密集型任务
长尾关键词:Node.js多线程怎么用、Worker Threads是什么、Node.js并行处理、线程池管理、CPU密集型优化
通过本节Node.js Worker Threads多线程编程,你将系统性掌握:
Worker Threads是什么?这是Node.js 10.5.0引入的多线程解决方案。Worker Threads允许在独立的线程中执行JavaScript代码,也是突破单线程限制的重要技术。
💡 学习建议:Worker Threads适合CPU密集型任务,对于I/O密集型任务,传统的异步模式仍然是最佳选择
// 🎉 基础Worker Threads示例
// main.js - 主线程
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
// 主线程代码
console.log('主线程启动');
// 创建工作线程
const worker = new Worker(__filename, {
workerData: { start: 0, end: 1000000 }
});
// 监听工作线程消息
worker.on('message', (result) => {
console.log('收到工作线程结果:', result);
});
// 监听工作线程错误
worker.on('error', (error) => {
console.error('工作线程错误:', error);
});
// 监听工作线程退出
worker.on('exit', (code) => {
console.log(`工作线程退出,代码: ${code}`);
});
} else {
// 工作线程代码
console.log('工作线程启动,数据:', workerData);
// 执行CPU密集型任务
function calculateSum(start, end) {
let sum = 0;
for (let i = start; i <= end; i++) {
sum += i;
}
return sum;
}
const result = calculateSum(workerData.start, workerData.end);
// 发送结果到主线程
parentPort.postMessage({
sum: result,
range: `${workerData.start}-${workerData.end}`
});
}// 🎉 分离的工作线程文件
// worker.js - 工作线程文件
const { parentPort, workerData } = require('worker_threads');
// 质数计算函数
function isPrime(n) {
if (n < 2) return false;
for (let i = 2; i <= Math.sqrt(n); i++) {
if (n % i === 0) return false;
}
return true;
}
function findPrimes(start, end) {
const primes = [];
for (let i = start; i <= end; i++) {
if (isPrime(i)) {
primes.push(i);
}
}
return primes;
}
// 处理任务
const { start, end } = workerData;
const primes = findPrimes(start, end);
// 发送结果
parentPort.postMessage({
primes,
count: primes.length,
range: `${start}-${end}`
});
// main.js - 主线程
const { Worker } = require('worker_threads');
const path = require('path');
async function findPrimesParallel(max, threadCount = 4) {
const chunkSize = Math.ceil(max / threadCount);
const workers = [];
const results = [];
for (let i = 0; i < threadCount; i++) {
const start = i * chunkSize + 1;
const end = Math.min((i + 1) * chunkSize, max);
const worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: { start, end }
});
workers.push(new Promise((resolve, reject) => {
worker.on('message', resolve);
worker.on('error', reject);
}));
}
const allResults = await Promise.all(workers);
// 合并结果
const allPrimes = allResults.flatMap(result => result.primes);
return allPrimes.sort((a, b) => a - b);
}
// 使用示例
findPrimesParallel(10000, 4).then(primes => {
console.log(`找到 ${primes.length} 个质数`);
console.log('前10个质数:', primes.slice(0, 10));
});MessageChannel提供了更灵活的线程间通信方式:
// 🎉 MessageChannel通信示例
const { Worker, MessageChannel, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
// 主线程 - 创建消息通道
const { port1, port2 } = new MessageChannel();
const worker = new Worker(__filename, {
transferList: [port2]
});
// 通过port1与工作线程通信
port1.on('message', (message) => {
console.log('主线程收到:', message);
if (message.type === 'request') {
// 响应工作线程请求
port1.postMessage({
type: 'response',
data: `处理结果: ${message.data}`,
timestamp: Date.now()
});
}
});
// 发送初始消息
port1.postMessage({
type: 'init',
config: { maxRetries: 3, timeout: 5000 }
});
} else {
// 工作线程
parentPort.once('message', (port) => {
// 接收MessageChannel端口
port.on('message', (message) => {
console.log('工作线程收到:', message);
if (message.type === 'init') {
// 处理初始化配置
console.log('配置:', message.config);
// 发送请求
port.postMessage({
type: 'request',
data: 'Hello from worker'
});
}
});
});
}// 🎉 双向通信示例
class WorkerManager {
constructor(workerFile) {
this.worker = new Worker(workerFile);
this.messageId = 0;
this.pendingMessages = new Map();
this.worker.on('message', this.handleMessage.bind(this));
this.worker.on('error', this.handleError.bind(this));
}
handleMessage(message) {
if (message.id && this.pendingMessages.has(message.id)) {
const { resolve, reject } = this.pendingMessages.get(message.id);
this.pendingMessages.delete(message.id);
if (message.error) {
reject(new Error(message.error));
} else {
resolve(message.result);
}
}
}
handleError(error) {
console.error('Worker错误:', error);
// 拒绝所有待处理的消息
for (const [id, { reject }] of this.pendingMessages) {
reject(error);
}
this.pendingMessages.clear();
}
async sendTask(task, data) {
return new Promise((resolve, reject) => {
const id = ++this.messageId;
this.pendingMessages.set(id, { resolve, reject });
this.worker.postMessage({
id,
task,
data
});
// 设置超时
setTimeout(() => {
if (this.pendingMessages.has(id)) {
this.pendingMessages.delete(id);
reject(new Error('任务超时'));
}
}, 10000);
});
}
terminate() {
return this.worker.terminate();
}
}SharedArrayBuffer允许在多个线程间共享内存:
// 🎉 SharedArrayBuffer示例
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
// 创建共享内存
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);
// 初始化数据
for (let i = 0; i < sharedArray.length; i++) {
sharedArray[i] = i;
}
console.log('初始数据:', sharedArray.slice(0, 10));
// 创建多个工作线程
const workers = [];
const workerCount = 4;
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(__filename, {
workerData: {
sharedBuffer,
workerId: i,
workerCount
}
});
worker.on('message', (message) => {
console.log(`Worker ${i}:`, message);
});
workers.push(worker);
}
// 等待所有工作线程完成
setTimeout(() => {
console.log('处理后数据:', sharedArray.slice(0, 10));
workers.forEach(worker => worker.terminate());
}, 2000);
} else {
// 工作线程
const { sharedBuffer, workerId, workerCount } = workerData;
const sharedArray = new Int32Array(sharedBuffer);
// 计算当前线程负责的数据范围
const chunkSize = Math.ceil(sharedArray.length / workerCount);
const start = workerId * chunkSize;
const end = Math.min(start + chunkSize, sharedArray.length);
// 处理数据(每个元素乘以2)
for (let i = start; i < end; i++) {
Atomics.store(sharedArray, i, sharedArray[i] * 2);
}
parentPort.postMessage({
workerId,
processed: end - start,
range: `${start}-${end-1}`
});
}// 🎉 原子操作示例
class SharedCounter {
constructor(workerCount) {
this.buffer = new SharedArrayBuffer(16);
this.counter = new Int32Array(this.buffer);
this.workerCount = workerCount;
// 初始化计数器
Atomics.store(this.counter, 0, 0); // 当前值
Atomics.store(this.counter, 1, 0); // 完成的工作线程数
}
getBuffer() {
return this.buffer;
}
static increment(buffer) {
const counter = new Int32Array(buffer);
return Atomics.add(counter, 0, 1);
}
static markWorkerComplete(buffer) {
const counter = new Int32Array(buffer);
return Atomics.add(counter, 1, 1);
}
static getValue(buffer) {
const counter = new Int32Array(buffer);
return Atomics.load(counter, 0);
}
static getCompletedWorkers(buffer) {
const counter = new Int32Array(buffer);
return Atomics.load(counter, 1);
}
}线程池可以重用工作线程,提高性能:
// 🎉 线程池实现
class WorkerPool {
constructor(workerFile, poolSize = 4) {
this.workerFile = workerFile;
this.poolSize = poolSize;
this.workers = [];
this.availableWorkers = [];
this.taskQueue = [];
this.initializePool();
}
initializePool() {
for (let i = 0; i < this.poolSize; i++) {
const worker = this.createWorker();
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
createWorker() {
const worker = new Worker(this.workerFile);
worker.on('message', (message) => {
if (worker.currentTask) {
const { resolve } = worker.currentTask;
worker.currentTask = null;
resolve(message);
// 将工作线程返回到可用池
this.availableWorkers.push(worker);
this.processQueue();
}
});
worker.on('error', (error) => {
if (worker.currentTask) {
const { reject } = worker.currentTask;
worker.currentTask = null;
reject(error);
// 重新创建工作线程
this.replaceWorker(worker);
}
});
return worker;
}
replaceWorker(oldWorker) {
const index = this.workers.indexOf(oldWorker);
if (index !== -1) {
oldWorker.terminate();
const newWorker = this.createWorker();
this.workers[index] = newWorker;
this.availableWorkers.push(newWorker);
this.processQueue();
}
}
async execute(data) {
return new Promise((resolve, reject) => {
const task = { data, resolve, reject };
if (this.availableWorkers.length > 0) {
this.assignTask(task);
} else {
this.taskQueue.push(task);
}
});
}
assignTask(task) {
const worker = this.availableWorkers.pop();
worker.currentTask = task;
worker.postMessage(task.data);
}
processQueue() {
while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
const task = this.taskQueue.shift();
this.assignTask(task);
}
}
async terminate() {
const terminatePromises = this.workers.map(worker => worker.terminate());
await Promise.all(terminatePromises);
}
getStats() {
return {
totalWorkers: this.workers.length,
availableWorkers: this.availableWorkers.length,
queuedTasks: this.taskQueue.length,
busyWorkers: this.workers.length - this.availableWorkers.length
};
}
}
// 使用线程池
const pool = new WorkerPool('./cpu-worker.js', 4);
async function processLargeDataset(data) {
const chunks = chunkArray(data, 1000);
const promises = chunks.map(chunk => pool.execute(chunk));
const results = await Promise.all(promises);
return results.flat();
}
function chunkArray(array, size) {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}线程池优势:
💼 生产环境建议:根据CPU核心数和任务特性调整线程池大小,通常设置为CPU核心数的1-2倍
通过本节Node.js Worker Threads多线程编程的学习,你已经掌握:
A: 适合CPU密集型任务,如图像处理、数据分析、加密解密、复杂计算等。不适合I/O密集型任务。
A: Worker Threads在同一进程内创建多个线程,共享内存;Cluster创建多个进程,内存隔离,适合不同的使用场景。
A: 通常设置为CPU核心数的1-2倍,具体需要根据任务特性和系统资源进行基准测试确定。
A: 需要浏览器或Node.js版本支持,存在安全考虑,需要正确使用原子操作避免竞态条件。
A: 可以使用console.log输出调试信息,或者使用Node.js的inspector协议进行调试。
// 问题:工作线程内存不释放
// 解决:正确管理线程生命周期
class ManagedWorkerPool {
constructor(workerFile, options = {}) {
this.maxIdleTime = options.maxIdleTime || 30000;
this.checkInterval = options.checkInterval || 5000;
setInterval(() => {
this.cleanupIdleWorkers();
}, this.checkInterval);
}
cleanupIdleWorkers() {
const now = Date.now();
this.availableWorkers = this.availableWorkers.filter(worker => {
if (now - worker.lastUsed > this.maxIdleTime) {
worker.terminate();
return false;
}
return true;
});
}
}// 问题:工作线程任务可能无限期运行
// 解决:实现任务超时机制
async function executeWithTimeout(worker, data, timeout = 10000) {
return Promise.race([
new Promise((resolve, reject) => {
worker.postMessage(data);
worker.once('message', resolve);
worker.once('error', reject);
}),
new Promise((_, reject) => {
setTimeout(() => {
reject(new Error('任务超时'));
}, timeout);
})
]);
}"掌握Worker Threads是构建高性能Node.js应用的关键技能,多线程编程让你的应用能够充分利用现代多核处理器的计算能力!"