Search K
Appearance
Appearance
📊 SEO元描述:2024年最新Node.js集群教程,详解Cluster模块、多进程架构、负载均衡、进程监控重启。包含完整PM2进程管理器使用,适合高级开发者掌握生产级多进程部署。
核心关键词:Node.js集群2024、Cluster模块、多进程架构、负载均衡、进程监控、PM2进程管理器
长尾关键词:Node.js集群怎么用、Cluster负载均衡、多进程部署、PM2使用教程、Node.js生产环境
通过本节Node.js集群Cluster,你将系统性掌握:
Node.js集群(Cluster)是什么?这是Node.js内置的多进程管理模块,允许创建共享服务器端口的子进程。集群模式是构建高性能、高可用Node.js应用的核心技术。
💡 学习建议:集群模式是生产环境部署的标准做法,理解其原理对于构建企业级应用至关重要
// 🎉 基础集群实现示例
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 根据CPU核心数创建工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 监听工作进程退出
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
// 自动重启工作进程
console.log('启动新的工作进程...');
cluster.fork();
});
// 监听工作进程上线
cluster.on('online', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已上线`);
});
// 监听工作进程断开连接
cluster.on('disconnect', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已断开连接`);
});
} else {
// 工作进程代码
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}\n`);
});
server.listen(3000, () => {
console.log(`工作进程 ${process.pid} 监听端口 3000`);
});
}高级集群管理器提供更完善的功能:
// 🎉 高级集群管理器实现
const cluster = require('cluster');
const os = require('os');
const EventEmitter = require('events');
class ClusterManager extends EventEmitter {
constructor(options = {}) {
super();
this.workerFile = options.workerFile || './worker.js';
this.workerCount = options.workerCount || os.cpus().length;
this.maxRestarts = options.maxRestarts || 10;
this.restartDelay = options.restartDelay || 1000;
this.gracefulTimeout = options.gracefulTimeout || 30000;
this.workers = new Map();
this.restartCounts = new Map();
this.isShuttingDown = false;
this.stats = {
totalRequests: 0,
totalRestarts: 0,
startTime: new Date()
};
}
start() {
if (!cluster.isMaster) {
throw new Error('ClusterManager只能在主进程中运行');
}
console.log(`集群管理器启动,主进程 PID: ${process.pid}`);
console.log(`计划启动 ${this.workerCount} 个工作进程`);
// 设置集群调度策略
cluster.schedulingPolicy = cluster.SCHED_RR; // 轮询调度
// 监听集群事件
this.setupClusterEvents();
// 创建工作进程
for (let i = 0; i < this.workerCount; i++) {
this.createWorker();
}
// 设置优雅关闭
this.setupGracefulShutdown();
// 定期输出统计信息
this.startStatsReporting();
this.emit('started');
}
setupClusterEvents() {
cluster.on('fork', (worker) => {
console.log(`创建工作进程: ${worker.process.pid}`);
this.emit('workerFork', worker);
});
cluster.on('online', (worker) => {
console.log(`工作进程上线: ${worker.process.pid}`);
this.emit('workerOnline', worker);
});
cluster.on('listening', (worker, address) => {
console.log(`工作进程监听: ${worker.process.pid} -> ${address.address}:${address.port}`);
this.emit('workerListening', worker, address);
});
cluster.on('disconnect', (worker) => {
console.log(`工作进程断开连接: ${worker.process.pid}`);
this.emit('workerDisconnect', worker);
});
cluster.on('exit', (worker, code, signal) => {
this.handleWorkerExit(worker, code, signal);
});
cluster.on('message', (worker, message) => {
this.handleWorkerMessage(worker, message);
});
}
createWorker() {
const worker = cluster.fork();
const workerId = worker.id;
const workerInfo = {
id: workerId,
pid: worker.process.pid,
worker: worker,
startTime: new Date(),
requests: 0,
restarts: this.restartCounts.get(workerId) || 0
};
this.workers.set(workerId, workerInfo);
this.restartCounts.set(workerId, workerInfo.restarts);
// 设置工作进程超时监控
this.setupWorkerTimeout(worker);
return worker;
}
setupWorkerTimeout(worker) {
// 监控工作进程响应
const healthCheckInterval = setInterval(() => {
if (worker.isDead()) {
clearInterval(healthCheckInterval);
return;
}
worker.send({ type: 'health-check', timestamp: Date.now() });
// 设置响应超时
const timeout = setTimeout(() => {
console.warn(`工作进程 ${worker.process.pid} 健康检查超时`);
worker.kill('SIGTERM');
}, 5000);
const messageHandler = (message) => {
if (message.type === 'health-response') {
clearTimeout(timeout);
worker.removeListener('message', messageHandler);
}
};
worker.once('message', messageHandler);
}, 30000); // 30秒检查一次
worker.on('exit', () => {
clearInterval(healthCheckInterval);
});
}
handleWorkerExit(worker, code, signal) {
const workerId = worker.id;
const workerInfo = this.workers.get(workerId);
if (workerInfo) {
const uptime = new Date() - workerInfo.startTime;
console.log(`工作进程退出: PID ${worker.process.pid}, 运行时间: ${uptime}ms, 退出码: ${code}, 信号: ${signal}`);
this.workers.delete(workerId);
this.stats.totalRestarts++;
this.emit('workerExit', worker, code, signal);
}
// 检查是否需要重启
if (!this.isShuttingDown) {
this.restartWorker(workerId);
}
}
restartWorker(workerId) {
const restartCount = this.restartCounts.get(workerId) || 0;
if (restartCount >= this.maxRestarts) {
console.error(`工作进程 ${workerId} 重启次数过多 (${restartCount}), 停止重启`);
this.emit('workerMaxRestarts', workerId);
return;
}
console.log(`${this.restartDelay}ms后重启工作进程 ${workerId} (第${restartCount + 1}次)`);
setTimeout(() => {
if (!this.isShuttingDown) {
this.restartCounts.set(workerId, restartCount + 1);
this.createWorker();
}
}, this.restartDelay);
}
handleWorkerMessage(worker, message) {
switch (message.type) {
case 'request-count':
const workerInfo = this.workers.get(worker.id);
if (workerInfo) {
workerInfo.requests = message.count;
this.stats.totalRequests += message.increment || 0;
}
break;
case 'error':
console.error(`工作进程 ${worker.process.pid} 错误:`, message.error);
this.emit('workerError', worker, message.error);
break;
case 'custom':
this.emit('workerMessage', worker, message.data);
break;
}
}
broadcast(message) {
for (const [id, workerInfo] of this.workers) {
if (!workerInfo.worker.isDead()) {
workerInfo.worker.send(message);
}
}
}
getWorkerStats() {
const workers = [];
for (const [id, workerInfo] of this.workers) {
workers.push({
id: workerInfo.id,
pid: workerInfo.pid,
uptime: new Date() - workerInfo.startTime,
requests: workerInfo.requests,
restarts: workerInfo.restarts,
memory: workerInfo.worker.process.memoryUsage ? workerInfo.worker.process.memoryUsage() : null
});
}
return workers;
}
getClusterStats() {
return {
...this.stats,
uptime: new Date() - this.stats.startTime,
workers: this.workers.size,
totalWorkers: this.workerCount
};
}
startStatsReporting() {
setInterval(() => {
const stats = this.getClusterStats();
const workers = this.getWorkerStats();
console.log('\n=== 集群统计 ===');
console.log(`运行时间: ${Math.round(stats.uptime / 1000)}s`);
console.log(`活跃工作进程: ${stats.workers}/${stats.totalWorkers}`);
console.log(`总请求数: ${stats.totalRequests}`);
console.log(`总重启数: ${stats.totalRestarts}`);
console.log('\n=== 工作进程详情 ===');
workers.forEach(worker => {
console.log(`PID ${worker.pid}: 请求=${worker.requests}, 运行时间=${Math.round(worker.uptime / 1000)}s, 重启=${worker.restarts}次`);
});
console.log('==================\n');
}, 60000); // 每分钟输出一次
}
setupGracefulShutdown() {
const shutdown = (signal) => {
console.log(`\n收到 ${signal} 信号,开始优雅关闭...`);
this.gracefulShutdown();
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
}
async gracefulShutdown() {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
this.emit('shutdown');
console.log('断开所有工作进程连接...');
// 断开所有工作进程
for (const [id, workerInfo] of this.workers) {
workerInfo.worker.disconnect();
}
// 等待工作进程优雅退出
const shutdownPromises = Array.from(this.workers.values()).map(workerInfo => {
return new Promise((resolve) => {
const worker = workerInfo.worker;
worker.on('exit', resolve);
// 强制终止超时
setTimeout(() => {
if (!worker.isDead()) {
console.log(`强制终止工作进程 ${worker.process.pid}`);
worker.kill('SIGKILL');
resolve();
}
}, this.gracefulTimeout);
});
});
await Promise.all(shutdownPromises);
console.log('所有工作进程已关闭');
process.exit(0);
}
reload() {
console.log('开始滚动重启工作进程...');
const workers = Array.from(this.workers.values());
let index = 0;
const reloadNext = () => {
if (index >= workers.length) {
console.log('滚动重启完成');
this.emit('reloadComplete');
return;
}
const workerInfo = workers[index++];
const worker = workerInfo.worker;
console.log(`重启工作进程 ${worker.process.pid}`);
// 创建新的工作进程
const newWorker = this.createWorker();
// 等待新进程就绪后关闭旧进程
newWorker.once('listening', () => {
worker.disconnect();
setTimeout(() => {
if (!worker.isDead()) {
worker.kill('SIGTERM');
}
// 继续下一个
setTimeout(reloadNext, 2000);
}, 1000);
});
};
reloadNext();
}
}
// worker.js - 工作进程文件示例
if (!cluster.isMaster) {
const http = require('http');
let requestCount = 0;
const server = http.createServer((req, res) => {
requestCount++;
// 定期向主进程报告请求数
if (requestCount % 100 === 0) {
process.send({
type: 'request-count',
count: requestCount,
increment: 100
});
}
// 模拟一些处理时间
setTimeout(() => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
message: 'Hello from cluster',
worker: process.pid,
requests: requestCount,
timestamp: new Date().toISOString()
}));
}, Math.random() * 100);
});
// 监听主进程消息
process.on('message', (message) => {
switch (message.type) {
case 'health-check':
process.send({
type: 'health-response',
timestamp: Date.now(),
memory: process.memoryUsage()
});
break;
}
});
server.listen(3000, () => {
console.log(`工作进程 ${process.pid} 启动成功`);
});
// 优雅关闭处理
process.on('SIGTERM', () => {
console.log(`工作进程 ${process.pid} 收到SIGTERM,准备关闭...`);
server.close(() => {
console.log(`工作进程 ${process.pid} 已关闭`);
process.exit(0);
});
});
}
// 使用集群管理器
if (cluster.isMaster) {
const manager = new ClusterManager({
workerCount: 4,
maxRestarts: 5,
restartDelay: 2000
});
manager.on('started', () => {
console.log('集群启动完成');
});
manager.on('workerError', (worker, error) => {
console.error(`工作进程错误: ${worker.process.pid}`, error);
});
manager.start();
// 测试滚动重启
setTimeout(() => {
manager.reload();
}, 30000);
}自定义负载均衡可以根据业务需求优化请求分发:
// 🎉 自定义负载均衡实现
const cluster = require('cluster');
const net = require('net');
class LoadBalancer {
constructor(options = {}) {
this.strategy = options.strategy || 'round-robin';
this.workers = [];
this.currentIndex = 0;
this.requestCounts = new Map();
this.responseTimes = new Map();
}
addWorker(worker) {
this.workers.push(worker);
this.requestCounts.set(worker.id, 0);
this.responseTimes.set(worker.id, []);
}
removeWorker(worker) {
const index = this.workers.indexOf(worker);
if (index !== -1) {
this.workers.splice(index, 1);
this.requestCounts.delete(worker.id);
this.responseTimes.delete(worker.id);
}
}
selectWorker(connection) {
if (this.workers.length === 0) return null;
switch (this.strategy) {
case 'round-robin':
return this.roundRobin();
case 'least-connections':
return this.leastConnections();
case 'least-response-time':
return this.leastResponseTime();
case 'ip-hash':
return this.ipHash(connection);
default:
return this.roundRobin();
}
}
roundRobin() {
const worker = this.workers[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.workers.length;
return worker;
}
leastConnections() {
return this.workers.reduce((min, worker) => {
const minCount = this.requestCounts.get(min.id) || 0;
const workerCount = this.requestCounts.get(worker.id) || 0;
return workerCount < minCount ? worker : min;
});
}
leastResponseTime() {
return this.workers.reduce((fastest, worker) => {
const fastestAvg = this.getAverageResponseTime(fastest.id);
const workerAvg = this.getAverageResponseTime(worker.id);
return workerAvg < fastestAvg ? worker : fastest;
});
}
ipHash(connection) {
const ip = connection.remoteAddress || '127.0.0.1';
const hash = this.hashCode(ip);
const index = Math.abs(hash) % this.workers.length;
return this.workers[index];
}
hashCode(str) {
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // 转换为32位整数
}
return hash;
}
getAverageResponseTime(workerId) {
const times = this.responseTimes.get(workerId) || [];
if (times.length === 0) return 0;
const sum = times.reduce((a, b) => a + b, 0);
return sum / times.length;
}
recordRequest(workerId) {
const count = this.requestCounts.get(workerId) || 0;
this.requestCounts.set(workerId, count + 1);
}
recordResponseTime(workerId, responseTime) {
const times = this.responseTimes.get(workerId) || [];
times.push(responseTime);
// 只保留最近100个响应时间
if (times.length > 100) {
times.shift();
}
this.responseTimes.set(workerId, times);
}
getStats() {
return {
strategy: this.strategy,
workers: this.workers.length,
requestCounts: Object.fromEntries(this.requestCounts),
averageResponseTimes: Object.fromEntries(
Array.from(this.responseTimes.keys()).map(id => [
id,
this.getAverageResponseTime(id)
])
)
};
}
}PM2是生产环境中最流行的Node.js进程管理器:
// 🎉 PM2配置文件示例
// ecosystem.config.js
module.exports = {
apps: [
{
name: 'my-app',
script: './app.js',
instances: 'max', // 或者指定数字,如 4
exec_mode: 'cluster',
// 环境变量
env: {
NODE_ENV: 'production',
PORT: 3000
},
// 开发环境变量
env_development: {
NODE_ENV: 'development',
PORT: 3001
},
// 日志配置
log_file: './logs/combined.log',
out_file: './logs/out.log',
error_file: './logs/error.log',
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
// 自动重启配置
watch: false, // 生产环境建议关闭
ignore_watch: ['node_modules', 'logs'],
max_restarts: 10,
min_uptime: '10s',
// 内存限制
max_memory_restart: '1G',
// 其他配置
kill_timeout: 5000,
wait_ready: true,
listen_timeout: 10000
}
],
deploy: {
production: {
user: 'deploy',
host: ['server1.example.com', 'server2.example.com'],
ref: 'origin/master',
repo: 'git@github.com:username/repository.git',
path: '/var/www/production',
'post-deploy': 'npm install && pm2 reload ecosystem.config.js --env production'
}
}
};# PM2常用命令
# 启动应用
pm2 start ecosystem.config.js
pm2 start app.js --name "my-app" -i max
# 管理应用
pm2 list # 查看所有应用
pm2 show my-app # 查看应用详情
pm2 logs my-app # 查看日志
pm2 monit # 实时监控
# 重启和重载
pm2 restart my-app # 重启应用
pm2 reload my-app # 零停机重载
pm2 gracefulReload my-app # 优雅重载
# 停止和删除
pm2 stop my-app # 停止应用
pm2 delete my-app # 删除应用
# 集群管理
pm2 scale my-app 4 # 扩展到4个实例
pm2 scale my-app +2 # 增加2个实例
# 保存和恢复
pm2 save # 保存当前进程列表
pm2 resurrect # 恢复保存的进程列表
pm2 startup # 设置开机自启
# 监控和日志
pm2 logs --lines 200 # 查看最近200行日志
pm2 flush # 清空日志
pm2 install pm2-logrotate # 安装日志轮转
# 部署
pm2 deploy production setup # 初始化部署
pm2 deploy production # 部署到生产环境PM2优势:
💼 生产环境建议:使用PM2管理生产环境的Node.js应用,配置适当的监控和告警机制
通过本节Node.js集群Cluster的学习,你已经掌握:
A: 集群模式创建多个进程,内存隔离;Worker Threads在同一进程内创建多个线程,共享内存。集群适合I/O密集型,Worker Threads适合CPU密集型。
A: 通常设置为CPU核心数,但需要根据应用特性、内存使用情况和负载测试结果进行调整。
A: 可以使用Redis、数据库、文件系统或进程间通信(IPC)来共享状态,避免在内存中存储共享状态。
A: PM2提供更丰富的功能,如监控、日志管理、部署工具等,而原生Cluster需要自己实现这些功能。
A: 使用PM2的reload命令或自己实现滚动重启,逐个重启工作进程,确保始终有进程在处理请求。
// 问题:多个进程尝试绑定同一端口
// 解决:正确使用cluster模块
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
// 主进程不监听端口
for (let i = 0; i < 2; i++) {
cluster.fork();
}
} else {
// 只有工作进程监听端口
const server = http.createServer((req, res) => {
res.end(`Worker ${process.pid}`);
});
server.listen(3000, () => {
console.log(`Worker ${process.pid} listening on port 3000`);
});
}// 问题:工作进程内存泄漏
// 解决:实现内存监控和自动重启
function monitorMemory() {
setInterval(() => {
const usage = process.memoryUsage();
const rss = usage.rss / 1024 / 1024; // MB
console.log(`进程 ${process.pid} 内存使用: ${rss.toFixed(2)}MB`);
// 内存使用超过500MB时重启
if (rss > 500) {
console.log('内存使用过高,准备重启进程');
process.exit(1);
}
}, 30000);
}
if (!cluster.isMaster) {
monitorMemory();
}"掌握集群技术是构建企业级Node.js应用的核心技能,多进程架构让你的应用具备高可用性和可扩展性!"