Search K
Appearance
Appearance
📊 SEO元描述:2024年最新Node.js子进程教程,详解spawn、exec、fork方法、进程间通信、标准输入输出重定向。包含完整进程池管理和任务调度,适合高级开发者掌握多进程编程。
核心关键词:Node.js子进程2024、Child Process、spawn、exec、fork、进程间通信、多进程编程
长尾关键词:Node.js子进程怎么用、spawn和exec区别、进程间通信方法、子进程管理、Node.js多进程
通过本节Node.js子进程Child Process,你将系统性掌握:
子进程(Child Process)是什么?这是Node.js中创建和管理子进程的核心模块。子进程允许Node.js应用启动其他程序或脚本,也是突破单进程限制的重要技术。
💡 学习建议:子进程适合CPU密集型任务和系统集成,与Worker Threads相比更适合调用外部程序
**spawn()**是最基础的子进程创建方法:
// 🎉 spawn()基础使用示例
const { spawn } = require('child_process');
// 创建子进程执行系统命令
const ls = spawn('ls', ['-la', '/usr']);
// 监听标准输出
ls.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});
// 监听标准错误
ls.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
// 监听进程关闭
ls.on('close', (code) => {
console.log(`子进程退出,退出码: ${code}`);
});
// 监听进程错误
ls.on('error', (err) => {
console.error('启动子进程失败:', err);
});// 🎉 高级spawn()实现
const { spawn } = require('child_process');
const EventEmitter = require('events');
class ProcessManager extends EventEmitter {
constructor() {
super();
this.processes = new Map();
this.processCounter = 0;
}
createProcess(command, args = [], options = {}) {
const processId = ++this.processCounter;
const defaultOptions = {
stdio: ['pipe', 'pipe', 'pipe'], // stdin, stdout, stderr
env: { ...process.env, ...options.env },
cwd: options.cwd || process.cwd(),
timeout: options.timeout || 30000
};
const mergedOptions = { ...defaultOptions, ...options };
try {
const childProcess = spawn(command, args, mergedOptions);
const processInfo = {
id: processId,
command,
args,
process: childProcess,
startTime: new Date(),
status: 'running'
};
this.processes.set(processId, processInfo);
// 设置超时
const timeoutTimer = setTimeout(() => {
if (processInfo.status === 'running') {
console.log(`进程 ${processId} 超时,强制终止`);
childProcess.kill('SIGKILL');
}
}, mergedOptions.timeout);
// 监听进程事件
childProcess.on('close', (code, signal) => {
clearTimeout(timeoutTimer);
processInfo.status = 'closed';
processInfo.exitCode = code;
processInfo.signal = signal;
processInfo.endTime = new Date();
console.log(`进程 ${processId} 结束: code=${code}, signal=${signal}`);
this.emit('processClose', processInfo);
});
childProcess.on('error', (err) => {
clearTimeout(timeoutTimer);
processInfo.status = 'error';
processInfo.error = err;
console.error(`进程 ${processId} 错误:`, err.message);
this.emit('processError', processInfo);
});
// 数据流处理
if (childProcess.stdout) {
childProcess.stdout.on('data', (data) => {
this.emit('processOutput', processId, 'stdout', data);
});
}
if (childProcess.stderr) {
childProcess.stderr.on('data', (data) => {
this.emit('processOutput', processId, 'stderr', data);
});
}
console.log(`创建进程 ${processId}: ${command} ${args.join(' ')}`);
this.emit('processCreated', processInfo);
return processInfo;
} catch (error) {
console.error('创建进程失败:', error.message);
throw error;
}
}
getProcess(processId) {
return this.processes.get(processId);
}
killProcess(processId, signal = 'SIGTERM') {
const processInfo = this.processes.get(processId);
if (processInfo && processInfo.status === 'running') {
processInfo.process.kill(signal);
return true;
}
return false;
}
writeToProcess(processId, data) {
const processInfo = this.processes.get(processId);
if (processInfo && processInfo.process.stdin) {
processInfo.process.stdin.write(data);
return true;
}
return false;
}
getProcessList() {
return Array.from(this.processes.values()).map(p => ({
id: p.id,
command: p.command,
args: p.args,
status: p.status,
startTime: p.startTime,
endTime: p.endTime,
exitCode: p.exitCode
}));
}
cleanup() {
for (const [id, processInfo] of this.processes) {
if (processInfo.status === 'running') {
processInfo.process.kill('SIGTERM');
}
}
this.processes.clear();
}
}
// 使用进程管理器
const pm = new ProcessManager();
pm.on('processCreated', (info) => {
console.log(`进程已创建: ${info.id}`);
});
pm.on('processOutput', (id, type, data) => {
console.log(`[${id}][${type}] ${data.toString().trim()}`);
});
pm.on('processClose', (info) => {
console.log(`进程已关闭: ${info.id}, 运行时间: ${info.endTime - info.startTime}ms`);
});
// 创建示例进程
const proc1 = pm.createProcess('ping', ['google.com', '-c', '3']);
const proc2 = pm.createProcess('node', ['--version']);**exec()**方法适合执行shell命令:
// 🎉 exec()方法使用
const { exec, execFile } = require('child_process');
const { promisify } = require('util');
const execAsync = promisify(exec);
const execFileAsync = promisify(execFile);
class CommandExecutor {
constructor(options = {}) {
this.defaultTimeout = options.timeout || 10000;
this.defaultMaxBuffer = options.maxBuffer || 1024 * 1024; // 1MB
}
async executeCommand(command, options = {}) {
const execOptions = {
timeout: options.timeout || this.defaultTimeout,
maxBuffer: options.maxBuffer || this.defaultMaxBuffer,
encoding: options.encoding || 'utf8',
env: { ...process.env, ...options.env },
cwd: options.cwd || process.cwd()
};
try {
console.log(`执行命令: ${command}`);
const startTime = Date.now();
const { stdout, stderr } = await execAsync(command, execOptions);
const duration = Date.now() - startTime;
return {
success: true,
stdout: stdout.trim(),
stderr: stderr.trim(),
duration,
command
};
} catch (error) {
return {
success: false,
error: error.message,
code: error.code,
signal: error.signal,
stdout: error.stdout ? error.stdout.trim() : '',
stderr: error.stderr ? error.stderr.trim() : '',
command
};
}
}
async executeFile(file, args = [], options = {}) {
const execOptions = {
timeout: options.timeout || this.defaultTimeout,
maxBuffer: options.maxBuffer || this.defaultMaxBuffer,
encoding: options.encoding || 'utf8',
env: { ...process.env, ...options.env },
cwd: options.cwd || process.cwd()
};
try {
console.log(`执行文件: ${file} ${args.join(' ')}`);
const startTime = Date.now();
const { stdout, stderr } = await execFileAsync(file, args, execOptions);
const duration = Date.now() - startTime;
return {
success: true,
stdout: stdout.trim(),
stderr: stderr.trim(),
duration,
file,
args
};
} catch (error) {
return {
success: false,
error: error.message,
code: error.code,
signal: error.signal,
stdout: error.stdout ? error.stdout.trim() : '',
stderr: error.stderr ? error.stderr.trim() : '',
file,
args
};
}
}
async batchExecute(commands) {
const results = [];
for (const cmd of commands) {
if (typeof cmd === 'string') {
const result = await this.executeCommand(cmd);
results.push(result);
} else if (cmd.type === 'file') {
const result = await this.executeFile(cmd.file, cmd.args, cmd.options);
results.push(result);
} else {
const result = await this.executeCommand(cmd.command, cmd.options);
results.push(result);
}
}
return results;
}
}
// 使用命令执行器
const executor = new CommandExecutor({ timeout: 5000 });
async function runCommands() {
// 单个命令执行
const result1 = await executor.executeCommand('ls -la');
console.log('命令结果:', result1);
// 文件执行
const result2 = await executor.executeFile('node', ['--version']);
console.log('文件执行结果:', result2);
// 批量执行
const batchResults = await executor.batchExecute([
'pwd',
'whoami',
{ command: 'echo "Hello World"' },
{ type: 'file', file: 'node', args: ['-e', 'console.log(process.version)'] }
]);
console.log('批量执行结果:', batchResults);
}
runCommands().catch(console.error);**fork()**专门用于创建Node.js子进程:
// 🎉 fork()方法和进程间通信
// worker.js - 子进程文件
if (process.send) {
// 子进程代码
console.log('子进程启动, PID:', process.pid);
// 监听父进程消息
process.on('message', (message) => {
console.log('子进程收到消息:', message);
switch (message.type) {
case 'task':
handleTask(message.data);
break;
case 'ping':
process.send({ type: 'pong', timestamp: Date.now() });
break;
case 'shutdown':
console.log('子进程准备关闭');
process.exit(0);
break;
}
});
function handleTask(data) {
// 模拟CPU密集型任务
const start = Date.now();
let result = 0;
for (let i = 0; i < data.iterations; i++) {
result += Math.sqrt(i);
}
const duration = Date.now() - start;
// 发送结果给父进程
process.send({
type: 'taskResult',
result: {
value: result,
duration,
iterations: data.iterations
}
});
}
// 定期发送心跳
setInterval(() => {
process.send({
type: 'heartbeat',
pid: process.pid,
memory: process.memoryUsage(),
uptime: process.uptime()
});
}, 5000);
} else {
// 主进程代码
const { fork } = require('child_process');
const path = require('path');
class WorkerManager {
constructor(workerFile, options = {}) {
this.workerFile = workerFile;
this.maxWorkers = options.maxWorkers || 4;
this.workers = [];
this.taskQueue = [];
this.taskCounter = 0;
this.pendingTasks = new Map();
}
start() {
console.log(`启动 ${this.maxWorkers} 个工作进程`);
for (let i = 0; i < this.maxWorkers; i++) {
this.createWorker();
}
// 定期处理任务队列
setInterval(() => {
this.processTaskQueue();
}, 100);
}
createWorker() {
const worker = fork(this.workerFile, [], {
silent: false,
env: process.env
});
const workerInfo = {
id: this.workers.length,
process: worker,
busy: false,
tasks: 0,
startTime: new Date()
};
// 监听工作进程消息
worker.on('message', (message) => {
this.handleWorkerMessage(workerInfo, message);
});
// 监听工作进程退出
worker.on('exit', (code, signal) => {
console.log(`工作进程 ${workerInfo.id} 退出: code=${code}, signal=${signal}`);
this.removeWorker(workerInfo);
// 重新创建工作进程
if (this.workers.length < this.maxWorkers) {
this.createWorker();
}
});
// 监听工作进程错误
worker.on('error', (err) => {
console.error(`工作进程 ${workerInfo.id} 错误:`, err.message);
});
this.workers.push(workerInfo);
console.log(`创建工作进程 ${workerInfo.id}, PID: ${worker.pid}`);
}
handleWorkerMessage(workerInfo, message) {
switch (message.type) {
case 'taskResult':
this.handleTaskResult(workerInfo, message.result);
break;
case 'heartbeat':
console.log(`工作进程 ${workerInfo.id} 心跳:`, message);
break;
case 'pong':
console.log(`工作进程 ${workerInfo.id} 响应ping`);
break;
}
}
handleTaskResult(workerInfo, result) {
workerInfo.busy = false;
workerInfo.tasks++;
console.log(`任务完成 - 工作进程 ${workerInfo.id}:`, result);
// 处理下一个任务
this.processTaskQueue();
}
addTask(taskData) {
const taskId = ++this.taskCounter;
const task = {
id: taskId,
data: taskData,
timestamp: new Date()
};
this.taskQueue.push(task);
console.log(`添加任务 ${taskId} 到队列`);
return taskId;
}
processTaskQueue() {
if (this.taskQueue.length === 0) return;
// 查找空闲的工作进程
const availableWorker = this.workers.find(w => !w.busy);
if (!availableWorker) return;
const task = this.taskQueue.shift();
availableWorker.busy = true;
console.log(`分配任务 ${task.id} 给工作进程 ${availableWorker.id}`);
availableWorker.process.send({
type: 'task',
data: task.data
});
}
removeWorker(workerInfo) {
const index = this.workers.indexOf(workerInfo);
if (index !== -1) {
this.workers.splice(index, 1);
}
}
getStats() {
return {
workers: this.workers.length,
busyWorkers: this.workers.filter(w => w.busy).length,
queuedTasks: this.taskQueue.length,
totalTasks: this.workers.reduce((sum, w) => sum + w.tasks, 0)
};
}
shutdown() {
console.log('关闭所有工作进程...');
this.workers.forEach(worker => {
worker.process.send({ type: 'shutdown' });
});
setTimeout(() => {
this.workers.forEach(worker => {
if (!worker.process.killed) {
worker.process.kill('SIGTERM');
}
});
}, 5000);
}
}
// 使用工作进程管理器
const manager = new WorkerManager(__filename, { maxWorkers: 2 });
manager.start();
// 添加测试任务
for (let i = 0; i < 5; i++) {
manager.addTask({
iterations: 1000000 * (i + 1),
name: `Task ${i + 1}`
});
}
// 定期输出统计信息
setInterval(() => {
console.log('管理器统计:', manager.getStats());
}, 3000);
// 优雅关闭
process.on('SIGINT', () => {
console.log('收到SIGINT信号,准备关闭...');
manager.shutdown();
setTimeout(() => process.exit(0), 6000);
});
}进程池可以重用子进程,提高性能:
// 🎉 高效进程池实现
class ProcessPool {
constructor(options = {}) {
this.poolSize = options.poolSize || 4;
this.workerScript = options.workerScript;
this.maxTasksPerWorker = options.maxTasksPerWorker || 100;
this.taskTimeout = options.taskTimeout || 30000;
this.workers = [];
this.availableWorkers = [];
this.taskQueue = [];
this.activeTasks = new Map();
this.taskCounter = 0;
this.stats = {
tasksCompleted: 0,
tasksErrored: 0,
workersCreated: 0,
workersDestroyed: 0
};
}
async initialize() {
console.log(`初始化进程池,大小: ${this.poolSize}`);
for (let i = 0; i < this.poolSize; i++) {
await this.createWorker();
}
console.log('进程池初始化完成');
}
async createWorker() {
return new Promise((resolve, reject) => {
const worker = fork(this.workerScript, [], {
silent: true,
env: process.env
});
const workerInfo = {
id: this.stats.workersCreated++,
process: worker,
busy: false,
tasksHandled: 0,
createdAt: new Date(),
lastUsed: new Date()
};
worker.on('message', (message) => {
this.handleWorkerMessage(workerInfo, message);
});
worker.on('exit', (code, signal) => {
this.handleWorkerExit(workerInfo, code, signal);
});
worker.on('error', (err) => {
console.error(`工作进程 ${workerInfo.id} 错误:`, err.message);
this.replaceWorker(workerInfo);
});
// 等待工作进程准备就绪
worker.once('message', (message) => {
if (message.type === 'ready') {
this.workers.push(workerInfo);
this.availableWorkers.push(workerInfo);
console.log(`工作进程 ${workerInfo.id} 已就绪, PID: ${worker.pid}`);
resolve(workerInfo);
} else {
reject(new Error('工作进程启动失败'));
}
});
// 启动超时
setTimeout(() => {
if (!this.workers.includes(workerInfo)) {
worker.kill();
reject(new Error('工作进程启动超时'));
}
}, 5000);
});
}
handleWorkerMessage(workerInfo, message) {
switch (message.type) {
case 'taskComplete':
this.handleTaskComplete(workerInfo, message);
break;
case 'taskError':
this.handleTaskError(workerInfo, message);
break;
case 'heartbeat':
workerInfo.lastUsed = new Date();
break;
}
}
handleTaskComplete(workerInfo, message) {
const task = this.activeTasks.get(message.taskId);
if (task) {
clearTimeout(task.timeout);
this.activeTasks.delete(message.taskId);
task.resolve(message.result);
this.stats.tasksCompleted++;
this.releaseWorker(workerInfo);
}
}
handleTaskError(workerInfo, message) {
const task = this.activeTasks.get(message.taskId);
if (task) {
clearTimeout(task.timeout);
this.activeTasks.delete(message.taskId);
task.reject(new Error(message.error));
this.stats.tasksErrored++;
this.releaseWorker(workerInfo);
}
}
releaseWorker(workerInfo) {
workerInfo.busy = false;
workerInfo.tasksHandled++;
// 检查是否需要重启工作进程
if (workerInfo.tasksHandled >= this.maxTasksPerWorker) {
this.replaceWorker(workerInfo);
} else {
this.availableWorkers.push(workerInfo);
this.processTaskQueue();
}
}
async replaceWorker(oldWorker) {
console.log(`替换工作进程 ${oldWorker.id}`);
// 从列表中移除
const workerIndex = this.workers.indexOf(oldWorker);
if (workerIndex !== -1) {
this.workers.splice(workerIndex, 1);
}
const availableIndex = this.availableWorkers.indexOf(oldWorker);
if (availableIndex !== -1) {
this.availableWorkers.splice(availableIndex, 1);
}
// 终止旧进程
oldWorker.process.kill();
this.stats.workersDestroyed++;
// 创建新进程
try {
await this.createWorker();
} catch (error) {
console.error('替换工作进程失败:', error.message);
}
}
async execute(taskData) {
return new Promise((resolve, reject) => {
const taskId = ++this.taskCounter;
const task = {
id: taskId,
data: taskData,
resolve,
reject,
createdAt: new Date()
};
// 设置任务超时
task.timeout = setTimeout(() => {
this.activeTasks.delete(taskId);
reject(new Error(`任务 ${taskId} 超时`));
}, this.taskTimeout);
this.activeTasks.set(taskId, task);
this.taskQueue.push(task);
this.processTaskQueue();
});
}
processTaskQueue() {
while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
const task = this.taskQueue.shift();
const worker = this.availableWorkers.shift();
worker.busy = true;
worker.lastUsed = new Date();
worker.process.send({
type: 'task',
taskId: task.id,
data: task.data
});
}
}
getStats() {
return {
...this.stats,
poolSize: this.workers.length,
availableWorkers: this.availableWorkers.length,
busyWorkers: this.workers.filter(w => w.busy).length,
queuedTasks: this.taskQueue.length,
activeTasks: this.activeTasks.size
};
}
async shutdown() {
console.log('关闭进程池...');
// 等待活跃任务完成
while (this.activeTasks.size > 0) {
await new Promise(resolve => setTimeout(resolve, 100));
}
// 关闭所有工作进程
const shutdownPromises = this.workers.map(worker => {
return new Promise((resolve) => {
worker.process.once('exit', resolve);
worker.process.send({ type: 'shutdown' });
// 强制终止超时
setTimeout(() => {
if (!worker.process.killed) {
worker.process.kill('SIGKILL');
resolve();
}
}, 5000);
});
});
await Promise.all(shutdownPromises);
console.log('进程池已关闭');
}
}进程池优势:
💼 生产环境建议:根据CPU核心数和任务特性调整进程池大小,监控内存使用情况,实现进程健康检查
通过本节Node.js子进程Child Process的学习,你已经掌握:
A: spawn适合流式处理,exec适合简单命令执行,fork专门用于Node.js子进程,支持进程间通信。
A: 通常设置为CPU核心数的1-2倍,具体需要根据任务类型和系统资源进行测试调优。
A: 监听exit和error事件,实现自动重启机制,记录错误日志,必要时降级处理。
A: IPC消息传递、共享文件、管道、Socket等,Node.js主要使用IPC消息传递。
A: 监控CPU使用率、内存占用、任务处理时间、错误率等指标,实现健康检查机制。
// 问题:子进程变成僵尸进程
// 解决:正确处理进程退出
function cleanupZombieProcesses() {
const { spawn } = require('child_process');
// 定期清理僵尸进程
setInterval(() => {
const ps = spawn('ps', ['aux']);
let output = '';
ps.stdout.on('data', (data) => {
output += data;
});
ps.on('close', () => {
const zombies = output.split('\n').filter(line =>
line.includes('<defunct>') || line.includes('Z+')
);
if (zombies.length > 0) {
console.warn('发现僵尸进程:', zombies.length);
}
});
}, 30000);
}// 问题:子进程内存泄漏
// 解决:实现内存监控
class MemoryMonitor {
constructor(threshold = 100 * 1024 * 1024) { // 100MB
this.threshold = threshold;
this.processes = new Map();
}
addProcess(id, process) {
this.processes.set(id, {
process,
startMemory: process.memoryUsage ? process.memoryUsage().rss : 0,
maxMemory: 0
});
}
checkMemory() {
for (const [id, info] of this.processes) {
if (info.process.memoryUsage) {
const currentMemory = info.process.memoryUsage().rss;
info.maxMemory = Math.max(info.maxMemory, currentMemory);
if (currentMemory > this.threshold) {
console.warn(`进程 ${id} 内存使用过高: ${Math.round(currentMemory / 1024 / 1024)}MB`);
// 可以选择重启进程
info.process.kill('SIGTERM');
this.processes.delete(id);
}
}
}
}
startMonitoring(interval = 10000) {
setInterval(() => {
this.checkMemory();
}, interval);
}
}"掌握子进程是构建高性能Node.js应用的重要技能,多进程编程让你的应用能够充分利用系统资源,处理更复杂的任务!"