Search K
Appearance
Appearance
📊 SEO元描述:2024年最新Node.js WebSocket教程,详解WebSocket协议、ws库使用、实时通信应用。包含完整Socket.IO框架和聊天室实现,适合高级开发者掌握实时Web应用开发。
核心关键词:Node.js WebSocket2024、WebSocket协议、实时通信、ws库、Socket.IO、双向通信
长尾关键词:Node.js WebSocket怎么用、WebSocket实时通信、Socket.IO框架、实时聊天应用、WebSocket性能优化
通过本节Node.js WebSocket实现,你将系统性掌握:
WebSocket是什么?这是一种在单个TCP连接上进行全双工通信的协议。WebSocket提供了浏览器和服务器之间的实时双向通信能力,也是构建现代实时Web应用的核心技术。
💡 学习建议:WebSocket适合需要实时双向通信的应用,如聊天室、在线游戏、实时协作、股票行情等
// 🎉 基础WebSocket服务器示例
const WebSocket = require('ws');
const http = require('http');
// 创建HTTP服务器
const server = http.createServer();
// 创建WebSocket服务器
const wss = new WebSocket.Server({
server,
path: '/ws'
});
// 监听WebSocket连接
wss.on('connection', (ws, request) => {
console.log('新的WebSocket连接:', request.socket.remoteAddress);
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
message: '欢迎连接到WebSocket服务器!',
timestamp: new Date().toISOString()
}));
// 监听客户端消息
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
console.log('收到消息:', message);
// 回显消息
ws.send(JSON.stringify({
type: 'echo',
original: message,
timestamp: new Date().toISOString()
}));
} catch (err) {
console.error('消息解析错误:', err.message);
}
});
// 监听连接关闭
ws.on('close', (code, reason) => {
console.log(`连接关闭: ${code} - ${reason}`);
});
// 监听连接错误
ws.on('error', (err) => {
console.error('WebSocket错误:', err.message);
});
// 定期发送心跳
const heartbeat = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
} else {
clearInterval(heartbeat);
}
}, 30000);
// 监听心跳响应
ws.on('pong', () => {
console.log('收到心跳响应');
});
});
// 启动服务器
const PORT = 3000;
server.listen(PORT, () => {
console.log(`WebSocket服务器启动: http://localhost:${PORT}`);
});// 🎉 WebSocket客户端示例(Node.js环境)
const WebSocket = require('ws');
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.reconnectInterval = 5000;
this.maxReconnectAttempts = 10;
this.reconnectAttempts = 0;
this.isConnecting = false;
}
connect() {
if (this.isConnecting) return;
this.isConnecting = true;
console.log('连接WebSocket服务器...');
this.ws = new WebSocket(this.url);
this.ws.on('open', () => {
console.log('WebSocket连接成功');
this.isConnecting = false;
this.reconnectAttempts = 0;
// 发送测试消息
this.send({
type: 'greeting',
message: 'Hello from client!'
});
});
this.ws.on('message', (data) => {
try {
const message = JSON.parse(data);
this.handleMessage(message);
} catch (err) {
console.error('消息解析错误:', err.message);
}
});
this.ws.on('close', (code, reason) => {
console.log(`连接关闭: ${code} - ${reason}`);
this.isConnecting = false;
this.scheduleReconnect();
});
this.ws.on('error', (err) => {
console.error('WebSocket错误:', err.message);
this.isConnecting = false;
});
}
handleMessage(message) {
console.log('收到服务器消息:', message);
switch (message.type) {
case 'welcome':
console.log('服务器欢迎消息:', message.message);
break;
case 'echo':
console.log('回显消息:', message.original);
break;
default:
console.log('未知消息类型:', message.type);
}
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
return true;
} else {
console.warn('WebSocket未连接,无法发送消息');
return false;
}
}
scheduleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`${this.reconnectInterval}ms后尝试第${this.reconnectAttempts}次重连`);
setTimeout(() => {
this.connect();
}, this.reconnectInterval);
} else {
console.log('达到最大重连次数,停止重连');
}
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// 使用WebSocket客户端
const client = new WebSocketClient('ws://localhost:3000/ws');
client.connect();
// 定期发送消息
setInterval(() => {
client.send({
type: 'ping',
timestamp: new Date().toISOString()
});
}, 10000);高级WebSocket服务器需要处理房间管理、用户认证、消息广播等功能:
// 🎉 高级WebSocket服务器实现
const WebSocket = require('ws');
const http = require('http');
const url = require('url');
const EventEmitter = require('events');
class AdvancedWebSocketServer extends EventEmitter {
constructor(options = {}) {
super();
this.port = options.port || 3000;
this.path = options.path || '/ws';
this.clients = new Map(); // 客户端管理
this.rooms = new Map(); // 房间管理
this.messageQueue = []; // 消息队列
this.server = http.createServer();
this.wss = new WebSocket.Server({
server: this.server,
path: this.path,
verifyClient: this.verifyClient.bind(this)
});
this.setupEventHandlers();
}
verifyClient(info) {
// 验证客户端连接(可以添加认证逻辑)
const query = url.parse(info.req.url, true).query;
// 简单的token验证示例
if (query.token) {
console.log('客户端认证成功:', query.token);
return true;
}
console.log('客户端认证失败');
return false;
}
setupEventHandlers() {
this.wss.on('connection', (ws, request) => {
this.handleConnection(ws, request);
});
this.wss.on('error', (err) => {
console.error('WebSocket服务器错误:', err.message);
this.emit('error', err);
});
}
handleConnection(ws, request) {
const clientId = this.generateClientId();
const query = url.parse(request.url, true).query;
// 创建客户端信息
const clientInfo = {
id: clientId,
ws: ws,
ip: request.socket.remoteAddress,
userAgent: request.headers['user-agent'],
connectedAt: new Date(),
lastActivity: new Date(),
rooms: new Set(),
user: {
name: query.name || `User_${clientId.slice(0, 8)}`,
token: query.token
}
};
this.clients.set(clientId, clientInfo);
console.log(`客户端连接: ${clientInfo.user.name} (${clientId})`);
// 发送连接确认
this.sendToClient(clientId, {
type: 'connected',
clientId: clientId,
serverTime: new Date().toISOString()
});
// 设置消息处理
ws.on('message', (data) => {
this.handleMessage(clientId, data);
});
// 设置连接关闭处理
ws.on('close', (code, reason) => {
this.handleDisconnection(clientId, code, reason);
});
// 设置错误处理
ws.on('error', (err) => {
console.error(`客户端错误 ${clientId}:`, err.message);
});
// 设置心跳
this.setupHeartbeat(clientId);
this.emit('clientConnected', clientInfo);
}
handleMessage(clientId, data) {
const client = this.clients.get(clientId);
if (!client) return;
client.lastActivity = new Date();
try {
const message = JSON.parse(data);
console.log(`收到消息 ${clientId}:`, message);
this.processMessage(clientId, message);
} catch (err) {
console.error('消息解析错误:', err.message);
this.sendToClient(clientId, {
type: 'error',
message: '消息格式错误'
});
}
}
processMessage(clientId, message) {
const client = this.clients.get(clientId);
switch (message.type) {
case 'join_room':
this.joinRoom(clientId, message.room);
break;
case 'leave_room':
this.leaveRoom(clientId, message.room);
break;
case 'room_message':
this.broadcastToRoom(message.room, {
type: 'room_message',
from: client.user.name,
message: message.message,
timestamp: new Date().toISOString()
}, clientId);
break;
case 'private_message':
this.sendPrivateMessage(clientId, message.to, message.message);
break;
case 'get_rooms':
this.sendToClient(clientId, {
type: 'rooms_list',
rooms: this.getRoomsList()
});
break;
case 'ping':
this.sendToClient(clientId, {
type: 'pong',
timestamp: new Date().toISOString()
});
break;
default:
this.emit('customMessage', clientId, message);
}
}
joinRoom(clientId, roomName) {
const client = this.clients.get(clientId);
if (!client) return;
// 创建房间(如果不存在)
if (!this.rooms.has(roomName)) {
this.rooms.set(roomName, {
name: roomName,
clients: new Set(),
createdAt: new Date(),
messageHistory: []
});
}
const room = this.rooms.get(roomName);
room.clients.add(clientId);
client.rooms.add(roomName);
console.log(`${client.user.name} 加入房间: ${roomName}`);
// 通知客户端
this.sendToClient(clientId, {
type: 'joined_room',
room: roomName,
memberCount: room.clients.size
});
// 通知房间其他成员
this.broadcastToRoom(roomName, {
type: 'user_joined',
user: client.user.name,
memberCount: room.clients.size
}, clientId);
}
leaveRoom(clientId, roomName) {
const client = this.clients.get(clientId);
const room = this.rooms.get(roomName);
if (!client || !room) return;
room.clients.delete(clientId);
client.rooms.delete(roomName);
console.log(`${client.user.name} 离开房间: ${roomName}`);
// 如果房间为空,删除房间
if (room.clients.size === 0) {
this.rooms.delete(roomName);
} else {
// 通知房间其他成员
this.broadcastToRoom(roomName, {
type: 'user_left',
user: client.user.name,
memberCount: room.clients.size
});
}
// 通知客户端
this.sendToClient(clientId, {
type: 'left_room',
room: roomName
});
}
broadcastToRoom(roomName, message, excludeClientId = null) {
const room = this.rooms.get(roomName);
if (!room) return;
const messageData = JSON.stringify(message);
for (const clientId of room.clients) {
if (clientId !== excludeClientId) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(messageData);
}
}
}
// 保存消息历史
room.messageHistory.push({
...message,
timestamp: new Date().toISOString()
});
// 限制历史消息数量
if (room.messageHistory.length > 100) {
room.messageHistory = room.messageHistory.slice(-100);
}
}
sendPrivateMessage(fromClientId, toUserName, message) {
const fromClient = this.clients.get(fromClientId);
if (!fromClient) return;
// 查找目标用户
let toClient = null;
for (const [clientId, client] of this.clients) {
if (client.user.name === toUserName) {
toClient = client;
break;
}
}
if (!toClient) {
this.sendToClient(fromClientId, {
type: 'error',
message: `用户 ${toUserName} 不在线`
});
return;
}
// 发送私信
const privateMessage = {
type: 'private_message',
from: fromClient.user.name,
message: message,
timestamp: new Date().toISOString()
};
this.sendToClient(toClient.id, privateMessage);
// 发送确认给发送者
this.sendToClient(fromClientId, {
type: 'private_message_sent',
to: toUserName,
message: message
});
}
sendToClient(clientId, data) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(data));
return true;
}
return false;
}
broadcast(data, excludeClientId = null) {
const messageData = JSON.stringify(data);
for (const [clientId, client] of this.clients) {
if (clientId !== excludeClientId && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(messageData);
}
}
}
setupHeartbeat(clientId) {
const heartbeatInterval = setInterval(() => {
const client = this.clients.get(clientId);
if (!client) {
clearInterval(heartbeatInterval);
return;
}
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.ping();
} else {
clearInterval(heartbeatInterval);
}
}, 30000);
const client = this.clients.get(clientId);
client.ws.on('pong', () => {
client.lastActivity = new Date();
});
}
handleDisconnection(clientId, code, reason) {
const client = this.clients.get(clientId);
if (!client) return;
console.log(`客户端断开: ${client.user.name} (${code} - ${reason})`);
// 离开所有房间
for (const roomName of client.rooms) {
this.leaveRoom(clientId, roomName);
}
// 删除客户端
this.clients.delete(clientId);
this.emit('clientDisconnected', client);
}
generateClientId() {
return 'client_' + Math.random().toString(36).substr(2, 9) + '_' + Date.now();
}
getRoomsList() {
const rooms = [];
for (const [name, room] of this.rooms) {
rooms.push({
name: name,
memberCount: room.clients.size,
createdAt: room.createdAt
});
}
return rooms;
}
getServerStats() {
return {
connectedClients: this.clients.size,
activeRooms: this.rooms.size,
uptime: process.uptime(),
memory: process.memoryUsage(),
timestamp: new Date().toISOString()
};
}
start() {
return new Promise((resolve, reject) => {
this.server.listen(this.port, (err) => {
if (err) {
reject(err);
} else {
console.log(`高级WebSocket服务器启动: http://localhost:${this.port}${this.path}`);
resolve();
}
});
});
}
stop() {
return new Promise((resolve) => {
// 关闭所有客户端连接
for (const [clientId, client] of this.clients) {
client.ws.close();
}
this.server.close(() => {
console.log('WebSocket服务器已关闭');
resolve();
});
});
}
}
// 使用高级WebSocket服务器
const wsServer = new AdvancedWebSocketServer({ port: 3000 });
wsServer.on('clientConnected', (client) => {
console.log(`新客户端: ${client.user.name}`);
});
wsServer.on('clientDisconnected', (client) => {
console.log(`客户端离线: ${client.user.name}`);
});
wsServer.start().catch(console.error);
// 定期输出服务器统计
setInterval(() => {
console.log('服务器统计:', wsServer.getServerStats());
}, 60000);Socket.IO是功能更丰富的实时通信框架:
// 🎉 Socket.IO服务器示例
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: {
origin: "*",
methods: ["GET", "POST"]
}
});
// 中间件:用户认证
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (token) {
// 验证token逻辑
socket.userId = token; // 简化示例
next();
} else {
next(new Error('认证失败'));
}
});
// 连接处理
io.on('connection', (socket) => {
console.log('用户连接:', socket.userId);
// 加入用户房间(用于私信)
socket.join(`user_${socket.userId}`);
// 处理加入房间
socket.on('join_room', (roomName) => {
socket.join(roomName);
socket.to(roomName).emit('user_joined', {
userId: socket.userId,
message: `${socket.userId} 加入了房间`
});
console.log(`${socket.userId} 加入房间: ${roomName}`);
});
// 处理离开房间
socket.on('leave_room', (roomName) => {
socket.leave(roomName);
socket.to(roomName).emit('user_left', {
userId: socket.userId,
message: `${socket.userId} 离开了房间`
});
console.log(`${socket.userId} 离开房间: ${roomName}`);
});
// 处理房间消息
socket.on('room_message', (data) => {
socket.to(data.room).emit('room_message', {
from: socket.userId,
message: data.message,
timestamp: new Date().toISOString()
});
});
// 处理私信
socket.on('private_message', (data) => {
socket.to(`user_${data.to}`).emit('private_message', {
from: socket.userId,
message: data.message,
timestamp: new Date().toISOString()
});
});
// 处理广播消息
socket.on('broadcast', (data) => {
socket.broadcast.emit('broadcast_message', {
from: socket.userId,
message: data.message,
timestamp: new Date().toISOString()
});
});
// 断开连接
socket.on('disconnect', (reason) => {
console.log(`用户断开: ${socket.userId}, 原因: ${reason}`);
});
});
// 启动服务器
const PORT = 3001;
server.listen(PORT, () => {
console.log(`Socket.IO服务器启动: http://localhost:${PORT}`);
});Socket.IO的优势:
💼 选择建议:简单应用使用原生WebSocket,复杂应用推荐Socket.IO
通过本节Node.js WebSocket实现的学习,你已经掌握:
A: HTTP是请求-响应模式,WebSocket是全双工通信;HTTP每次请求都有头部开销,WebSocket建立连接后开销很小。
A: 实现心跳检测、自动重连机制,在客户端和服务端都要处理连接状态。
A: 理论上没有限制,实际受服务器资源和系统配置影响,单机可支持数万到数十万连接。
A: 使用粘性会话、Redis共享状态、或专门的WebSocket负载均衡器如HAProxy。
A: 使用WSS(WebSocket Secure)、token认证、输入验证、CORS配置等安全措施。
// 问题:WebSocket连接不稳定
// 解决:实现健壮的心跳和重连机制
class RobustWebSocketClient {
constructor(url) {
this.url = url;
this.reconnectDelay = 1000;
this.maxReconnectDelay = 30000;
this.reconnectAttempts = 0;
this.heartbeatInterval = 30000;
this.heartbeatTimer = null;
this.reconnectTimer = null;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('连接成功');
this.reconnectAttempts = 0;
this.reconnectDelay = 1000;
this.startHeartbeat();
};
this.ws.onclose = () => {
console.log('连接关闭');
this.stopHeartbeat();
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error('连接错误:', error);
};
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, this.heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
scheduleReconnect() {
if (this.reconnectTimer) return;
this.reconnectTimer = setTimeout(() => {
this.reconnectAttempts++;
console.log(`重连尝试 ${this.reconnectAttempts}`);
this.connect();
this.reconnectTimer = null;
// 指数退避
this.reconnectDelay = Math.min(
this.reconnectDelay * 2,
this.maxReconnectDelay
);
}, this.reconnectDelay);
}
}// 问题:WebSocket连接导致内存泄漏
// 解决:正确管理连接生命周期
class ManagedWebSocketServer {
constructor() {
this.clients = new Map();
this.cleanupInterval = 60000; // 1分钟清理一次
setInterval(() => {
this.cleanupInactiveClients();
}, this.cleanupInterval);
}
addClient(clientId, ws) {
const client = {
ws,
lastActivity: Date.now(),
heartbeatTimer: null
};
this.clients.set(clientId, client);
this.setupClientHeartbeat(clientId);
}
setupClientHeartbeat(clientId) {
const client = this.clients.get(clientId);
if (!client) return;
client.heartbeatTimer = setInterval(() => {
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.ping();
} else {
this.removeClient(clientId);
}
}, 30000);
}
removeClient(clientId) {
const client = this.clients.get(clientId);
if (client) {
if (client.heartbeatTimer) {
clearInterval(client.heartbeatTimer);
}
client.ws.terminate();
this.clients.delete(clientId);
}
}
cleanupInactiveClients() {
const now = Date.now();
const timeout = 300000; // 5分钟超时
for (const [clientId, client] of this.clients) {
if (now - client.lastActivity > timeout) {
console.log(`清理不活跃客户端: ${clientId}`);
this.removeClient(clientId);
}
}
}
}"掌握WebSocket是构建现代实时Web应用的核心技能,双向通信让你的应用拥有更好的用户体验和交互性!"