Search K
Appearance
Appearance
📊 SEO元描述:2024年最新Node.js TCP编程教程,详解TCP服务器客户端、数据传输协议、连接管理。包含完整网络通信实现和性能优化,适合高级开发者掌握网络编程。
核心关键词:Node.js TCP编程2024、TCP服务器、TCP客户端、网络通信、数据传输协议、连接管理
长尾关键词:Node.js TCP怎么用、TCP编程原理、网络通信协议、TCP连接管理、Node.js网络编程
通过本节Node.js TCP编程,你将系统性掌握:
TCP(传输控制协议)是什么?这是互联网最重要的传输层协议之一。TCP提供可靠的、面向连接的数据传输服务,也是构建稳定网络应用的重要基础。
💡 学习建议:TCP适合需要可靠传输的应用,如文件传输、数据库连接、HTTP等协议都基于TCP
// 🎉 基础TCP服务器示例
const net = require('net');
// 创建TCP服务器
const server = net.createServer((socket) => {
console.log('客户端连接:', socket.remoteAddress + ':' + socket.remotePort);
// 发送欢迎消息
socket.write('欢迎连接到TCP服务器!\n');
// 监听客户端数据
socket.on('data', (data) => {
const message = data.toString().trim();
console.log('收到消息:', message);
// 回显消息
socket.write(`服务器收到: ${message}\n`);
});
// 监听连接关闭
socket.on('close', () => {
console.log('客户端断开连接');
});
// 监听连接错误
socket.on('error', (err) => {
console.error('连接错误:', err.message);
});
});
// 监听服务器错误
server.on('error', (err) => {
console.error('服务器错误:', err.message);
});
// 启动服务器
const PORT = 3000;
server.listen(PORT, () => {
console.log(`TCP服务器启动,监听端口 ${PORT}`);
});// 🎉 高级TCP服务器实现
const net = require('net');
const EventEmitter = require('events');
class TCPServer extends EventEmitter {
constructor(options = {}) {
super();
this.port = options.port || 3000;
this.host = options.host || '0.0.0.0';
this.maxConnections = options.maxConnections || 100;
this.timeout = options.timeout || 30000;
this.server = null;
this.connections = new Map();
this.connectionCount = 0;
}
start() {
this.server = net.createServer((socket) => {
this.handleConnection(socket);
});
this.server.maxConnections = this.maxConnections;
this.server.on('error', (err) => {
this.emit('error', err);
});
this.server.on('close', () => {
this.emit('close');
});
return new Promise((resolve, reject) => {
this.server.listen(this.port, this.host, (err) => {
if (err) {
reject(err);
} else {
console.log(`TCP服务器启动: ${this.host}:${this.port}`);
resolve();
}
});
});
}
handleConnection(socket) {
const connectionId = `${socket.remoteAddress}:${socket.remotePort}`;
this.connectionCount++;
// 设置连接超时
socket.setTimeout(this.timeout);
// 存储连接信息
const connectionInfo = {
id: connectionId,
socket: socket,
connectedAt: new Date(),
lastActivity: new Date()
};
this.connections.set(connectionId, connectionInfo);
console.log(`新连接: ${connectionId} (总连接数: ${this.connectionCount})`);
this.emit('connection', connectionInfo);
// 数据处理
socket.on('data', (data) => {
connectionInfo.lastActivity = new Date();
this.handleData(connectionInfo, data);
});
// 连接关闭
socket.on('close', () => {
this.handleDisconnection(connectionInfo);
});
// 连接错误
socket.on('error', (err) => {
console.error(`连接错误 ${connectionId}:`, err.message);
this.handleDisconnection(connectionInfo);
});
// 连接超时
socket.on('timeout', () => {
console.log(`连接超时: ${connectionId}`);
socket.destroy();
});
}
handleData(connectionInfo, data) {
try {
const message = data.toString().trim();
console.log(`收到数据 ${connectionInfo.id}:`, message);
this.emit('data', connectionInfo, message);
// 简单的命令处理
if (message.startsWith('PING')) {
connectionInfo.socket.write('PONG\n');
} else if (message.startsWith('TIME')) {
connectionInfo.socket.write(`TIME ${new Date().toISOString()}\n`);
} else if (message.startsWith('STATS')) {
const stats = this.getServerStats();
connectionInfo.socket.write(`STATS ${JSON.stringify(stats)}\n`);
} else {
connectionInfo.socket.write(`ECHO ${message}\n`);
}
} catch (err) {
console.error('数据处理错误:', err.message);
}
}
handleDisconnection(connectionInfo) {
this.connections.delete(connectionInfo.id);
this.connectionCount--;
console.log(`连接断开: ${connectionInfo.id} (剩余连接: ${this.connectionCount})`);
this.emit('disconnection', connectionInfo);
}
broadcast(message) {
for (const [id, conn] of this.connections) {
try {
conn.socket.write(message);
} catch (err) {
console.error(`广播失败 ${id}:`, err.message);
}
}
}
getServerStats() {
return {
connections: this.connectionCount,
uptime: process.uptime(),
memory: process.memoryUsage(),
timestamp: new Date().toISOString()
};
}
stop() {
return new Promise((resolve) => {
// 关闭所有连接
for (const [id, conn] of this.connections) {
conn.socket.destroy();
}
this.server.close(() => {
console.log('TCP服务器已关闭');
resolve();
});
});
}
}
// 使用高级TCP服务器
const server = new TCPServer({ port: 3000, maxConnections: 50 });
server.on('connection', (conn) => {
console.log(`新客户端连接: ${conn.id}`);
});
server.on('data', (conn, message) => {
console.log(`处理消息: ${message}`);
});
server.start().catch(console.error);TCP客户端用于连接TCP服务器并进行数据通信:
// 🎉 基础TCP客户端示例
const net = require('net');
// 创建TCP客户端
const client = new net.Socket();
// 连接到服务器
client.connect(3000, 'localhost', () => {
console.log('连接到服务器成功');
// 发送消息
client.write('Hello Server!');
});
// 接收服务器数据
client.on('data', (data) => {
console.log('收到服务器消息:', data.toString());
});
// 连接关闭
client.on('close', () => {
console.log('连接已关闭');
});
// 连接错误
client.on('error', (err) => {
console.error('连接错误:', err.message);
});// 🎉 高级TCP客户端实现
const net = require('net');
const EventEmitter = require('events');
class TCPClient extends EventEmitter {
constructor(options = {}) {
super();
this.host = options.host || 'localhost';
this.port = options.port || 3000;
this.reconnectInterval = options.reconnectInterval || 5000;
this.maxReconnectAttempts = options.maxReconnectAttempts || 10;
this.timeout = options.timeout || 10000;
this.socket = null;
this.connected = false;
this.reconnectAttempts = 0;
this.reconnectTimer = null;
this.heartbeatTimer = null;
}
connect() {
return new Promise((resolve, reject) => {
this.socket = new net.Socket();
// 设置连接超时
this.socket.setTimeout(this.timeout);
this.socket.connect(this.port, this.host, () => {
this.connected = true;
this.reconnectAttempts = 0;
console.log(`连接成功: ${this.host}:${this.port}`);
this.emit('connected');
this.startHeartbeat();
resolve();
});
this.socket.on('data', (data) => {
this.handleData(data);
});
this.socket.on('close', () => {
this.handleDisconnection();
});
this.socket.on('error', (err) => {
console.error('连接错误:', err.message);
this.emit('error', err);
if (!this.connected) {
reject(err);
}
});
this.socket.on('timeout', () => {
console.log('连接超时');
this.socket.destroy();
});
});
}
handleData(data) {
const message = data.toString().trim();
console.log('收到消息:', message);
// 处理心跳响应
if (message === 'PONG') {
this.emit('heartbeat');
return;
}
this.emit('data', message);
}
handleDisconnection() {
this.connected = false;
this.stopHeartbeat();
console.log('连接断开');
this.emit('disconnected');
// 自动重连
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.scheduleReconnect();
} else {
console.log('达到最大重连次数,停止重连');
this.emit('maxReconnectAttemptsReached');
}
}
scheduleReconnect() {
this.reconnectAttempts++;
console.log(`${this.reconnectInterval}ms后尝试第${this.reconnectAttempts}次重连`);
this.reconnectTimer = setTimeout(() => {
this.connect().catch(() => {
// 重连失败,会触发handleDisconnection继续重连
});
}, this.reconnectInterval);
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.connected) {
this.send('PING');
}
}, 30000); // 30秒心跳
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
send(message) {
if (this.connected && this.socket) {
try {
this.socket.write(message + '\n');
return true;
} catch (err) {
console.error('发送消息失败:', err.message);
return false;
}
} else {
console.warn('连接未建立,无法发送消息');
return false;
}
}
disconnect() {
this.stopHeartbeat();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.socket) {
this.socket.destroy();
}
this.connected = false;
console.log('主动断开连接');
}
}
// 使用高级TCP客户端
const client = new TCPClient({
host: 'localhost',
port: 3000,
reconnectInterval: 3000,
maxReconnectAttempts: 5
});
client.on('connected', () => {
console.log('客户端连接成功');
// 发送测试消息
setInterval(() => {
client.send(`测试消息 ${new Date().toISOString()}`);
}, 5000);
});
client.on('data', (message) => {
console.log('处理服务器消息:', message);
});
client.on('disconnected', () => {
console.log('客户端连接断开');
});
client.connect().catch(console.error);协议设计是TCP编程的重要环节:
// 🎉 自定义协议实现
class MessageProtocol {
constructor() {
this.HEADER_SIZE = 8; // 4字节长度 + 4字节类型
this.buffer = Buffer.alloc(0);
}
// 编码消息
encode(type, data) {
const payload = Buffer.from(JSON.stringify(data), 'utf8');
const header = Buffer.alloc(this.HEADER_SIZE);
// 写入数据长度(不包括头部)
header.writeUInt32BE(payload.length, 0);
// 写入消息类型
header.writeUInt32BE(type, 4);
return Buffer.concat([header, payload]);
}
// 解码消息
decode(data) {
this.buffer = Buffer.concat([this.buffer, data]);
const messages = [];
while (this.buffer.length >= this.HEADER_SIZE) {
// 读取消息长度
const length = this.buffer.readUInt32BE(0);
const totalLength = this.HEADER_SIZE + length;
// 检查是否有完整消息
if (this.buffer.length < totalLength) {
break;
}
// 读取消息类型
const type = this.buffer.readUInt32BE(4);
// 读取消息数据
const payload = this.buffer.slice(this.HEADER_SIZE, totalLength);
try {
const data = JSON.parse(payload.toString('utf8'));
messages.push({ type, data });
} catch (err) {
console.error('消息解析错误:', err.message);
}
// 移除已处理的消息
this.buffer = this.buffer.slice(totalLength);
}
return messages;
}
}
// 消息类型常量
const MessageTypes = {
HEARTBEAT: 1,
CHAT: 2,
FILE_TRANSFER: 3,
SYSTEM: 4
};
// 使用协议的服务器
class ProtocolServer extends TCPServer {
constructor(options) {
super(options);
this.protocol = new MessageProtocol();
}
handleData(connectionInfo, data) {
try {
const messages = this.protocol.decode(data);
messages.forEach(message => {
this.processMessage(connectionInfo, message);
});
} catch (err) {
console.error('协议处理错误:', err.message);
}
}
processMessage(connectionInfo, message) {
console.log(`收到消息 ${connectionInfo.id}:`, message);
switch (message.type) {
case MessageTypes.HEARTBEAT:
this.sendMessage(connectionInfo.socket, MessageTypes.HEARTBEAT, {
timestamp: Date.now()
});
break;
case MessageTypes.CHAT:
// 广播聊天消息
this.broadcastMessage(MessageTypes.CHAT, {
from: connectionInfo.id,
message: message.data.message,
timestamp: Date.now()
});
break;
case MessageTypes.SYSTEM:
this.handleSystemMessage(connectionInfo, message.data);
break;
}
}
sendMessage(socket, type, data) {
const encoded = this.protocol.encode(type, data);
socket.write(encoded);
}
broadcastMessage(type, data) {
const encoded = this.protocol.encode(type, data);
this.broadcast(encoded);
}
handleSystemMessage(connectionInfo, data) {
if (data.command === 'get_stats') {
this.sendMessage(connectionInfo.socket, MessageTypes.SYSTEM, {
command: 'stats_response',
stats: this.getServerStats()
});
}
}
}协议设计原则:
💼 最佳实践:在生产环境中,建议使用成熟的序列化协议如Protocol Buffers或MessagePack
通过本节Node.js TCP编程的学习,你已经掌握:
A: TCP提供可靠的、面向连接的传输,保证数据完整性;UDP是无连接的,传输速度快但不保证可靠性。
A: 使用长度前缀、分隔符或固定长度等方式定义消息边界,确保正确解析消息。
A: 有限制,主要受文件描述符数量、内存大小和系统配置影响,可以通过调优提高并发连接数。
A: 可以使用反向代理、DNS轮询、或在应用层实现连接分发算法。
A: 使用流式传输,分块发送,实现进度监控和断点续传功能。
// 问题:TCP连接没有正确关闭导致资源泄漏
// 解决:实现连接监控和自动清理
class ConnectionMonitor {
constructor() {
this.connections = new Map();
this.maxIdleTime = 300000; // 5分钟
setInterval(() => {
this.cleanupIdleConnections();
}, 60000); // 每分钟检查一次
}
addConnection(id, socket) {
this.connections.set(id, {
socket,
lastActivity: Date.now()
});
}
updateActivity(id) {
const conn = this.connections.get(id);
if (conn) {
conn.lastActivity = Date.now();
}
}
cleanupIdleConnections() {
const now = Date.now();
for (const [id, conn] of this.connections) {
if (now - conn.lastActivity > this.maxIdleTime) {
console.log(`清理空闲连接: ${id}`);
conn.socket.destroy();
this.connections.delete(id);
}
}
}
}// 问题:发送数据过快导致内存溢出
// 解决:实现背压控制
function writeWithBackpressure(socket, data) {
return new Promise((resolve, reject) => {
const canContinue = socket.write(data);
if (canContinue) {
resolve();
} else {
socket.once('drain', resolve);
socket.once('error', reject);
}
});
}"掌握TCP编程是构建可靠网络应用的基础,深入理解网络通信原理将让你的应用更加稳定和高效!"