Search K
Appearance
Appearance
📊 SEO元描述:2024年最新JavaScript数据可视化处理教程,详解数据获取清洗、实时数据更新、数据格式转换。包含完整数据处理架构,适合前端开发者掌握大屏可视化数据处理技术。
核心关键词:JavaScript数据可视化2024、数据处理架构、实时数据更新、数据清洗转换、前端数据可视化
长尾关键词:JavaScript数据可视化怎么做、实时数据更新怎么实现、数据清洗算法怎么写、可视化数据处理优化、前端大屏数据架构
通过本节JavaScript数据可视化数据处理,你将系统性掌握:
数据可视化处理是什么?这是构建专业数据大屏最核心的技术基础问题。数据可视化处理是基于数据管道架构的信息处理系统,也是现代数据大屏应用的技术支撑。
💡 架构原则:数据处理系统需要在实时性、准确性和性能之间找到最佳平衡点
构建完整的数据获取和清洗系统,实现多源数据的统一处理:
// 🎉 数据处理管理器
class DataProcessingManager {
constructor() {
// 数据源管理
this.dataSources = new Map();
this.dataConnectors = new Map();
// 数据处理管道
this.processingPipeline = new DataPipeline();
this.dataTransformers = new Map();
this.dataValidators = new Map();
// 缓存系统
this.dataCache = new DataCache();
this.cacheStrategy = 'lru'; // lru, fifo, ttl
// 实时更新
this.realTimeManager = new RealTimeDataManager();
this.updateSubscribers = new Set();
// 性能监控
this.performanceMonitor = new DataPerformanceMonitor();
// 配置
this.config = {
maxCacheSize: 1000,
cacheTTL: 300000, // 5分钟
batchSize: 100,
retryAttempts: 3,
timeoutMs: 10000,
enableCompression: true
};
this.initDataProcessing();
}
// 初始化数据处理系统
async initDataProcessing() {
await this.setupDataSources();
this.setupProcessingPipeline();
this.startPerformanceMonitoring();
}
// 注册数据源
registerDataSource(sourceId, sourceConfig) {
const dataSource = this.createDataSource(sourceConfig);
this.dataSources.set(sourceId, dataSource);
// 创建数据连接器
const connector = new DataConnector(dataSource, this.config);
this.dataConnectors.set(sourceId, connector);
console.log(`Data source registered: ${sourceId}`);
return dataSource;
}
// 创建数据源
createDataSource(config) {
switch (config.type) {
case 'rest':
return new RestDataSource(config);
case 'websocket':
return new WebSocketDataSource(config);
case 'sse':
return new SSEDataSource(config);
case 'file':
return new FileDataSource(config);
case 'mock':
return new MockDataSource(config);
default:
throw new Error(`Unsupported data source type: ${config.type}`);
}
}
// 获取数据
async fetchData(sourceId, params = {}) {
const startTime = performance.now();
try {
// 检查缓存
const cacheKey = this.generateCacheKey(sourceId, params);
const cachedData = this.dataCache.get(cacheKey);
if (cachedData && !this.isCacheExpired(cachedData)) {
this.performanceMonitor.recordCacheHit(sourceId);
return cachedData.data;
}
// 从数据源获取
const connector = this.dataConnectors.get(sourceId);
if (!connector) {
throw new Error(`Data source not found: ${sourceId}`);
}
const rawData = await connector.fetch(params);
// 数据处理管道
const processedData = await this.processingPipeline.process(rawData, sourceId);
// 缓存处理后的数据
this.dataCache.set(cacheKey, {
data: processedData,
timestamp: Date.now(),
ttl: this.config.cacheTTL
});
// 记录性能指标
const duration = performance.now() - startTime;
this.performanceMonitor.recordDataFetch(sourceId, duration, processedData.length);
return processedData;
} catch (error) {
this.performanceMonitor.recordError(sourceId, error);
console.error(`Failed to fetch data from ${sourceId}:`, error);
throw error;
}
}
// 批量获取数据
async fetchMultipleData(requests) {
const promises = requests.map(async (request) => {
try {
const data = await this.fetchData(request.sourceId, request.params);
return { sourceId: request.sourceId, data, error: null };
} catch (error) {
return { sourceId: request.sourceId, data: null, error };
}
});
const results = await Promise.allSettled(promises);
return results.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
} else {
return {
sourceId: requests[index].sourceId,
data: null,
error: result.reason
};
}
});
}
// 启动实时数据更新
startRealTimeUpdates(sourceId, updateInterval = 5000) {
return this.realTimeManager.startUpdates(sourceId, updateInterval, (data) => {
this.handleRealTimeData(sourceId, data);
});
}
// 处理实时数据
async handleRealTimeData(sourceId, rawData) {
try {
// 处理数据
const processedData = await this.processingPipeline.process(rawData, sourceId);
// 更新缓存
const cacheKey = this.generateCacheKey(sourceId, {});
this.dataCache.set(cacheKey, {
data: processedData,
timestamp: Date.now(),
ttl: this.config.cacheTTL
});
// 通知订阅者
this.notifySubscribers(sourceId, processedData);
} catch (error) {
console.error(`Failed to process real-time data from ${sourceId}:`, error);
}
}
// 订阅数据更新
subscribe(sourceId, callback) {
const subscription = {
sourceId,
callback,
id: this.generateSubscriptionId()
};
this.updateSubscribers.add(subscription);
return () => {
this.updateSubscribers.delete(subscription);
};
}
// 通知订阅者
notifySubscribers(sourceId, data) {
this.updateSubscribers.forEach(subscription => {
if (subscription.sourceId === sourceId) {
try {
subscription.callback(data);
} catch (error) {
console.error('Subscriber callback error:', error);
}
}
});
}
// 生成缓存键
generateCacheKey(sourceId, params) {
const paramString = JSON.stringify(params);
return `${sourceId}:${btoa(paramString)}`;
}
// 检查缓存是否过期
isCacheExpired(cachedItem) {
return Date.now() - cachedItem.timestamp > cachedItem.ttl;
}
// 生成订阅ID
generateSubscriptionId() {
return 'sub_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
}
}
// 数据处理管道
class DataPipeline {
constructor() {
this.processors = [];
this.middlewares = [];
}
// 添加处理器
addProcessor(processor) {
this.processors.push(processor);
}
// 添加中间件
addMiddleware(middleware) {
this.middlewares.push(middleware);
}
// 处理数据
async process(data, sourceId) {
let processedData = data;
// 应用中间件
for (const middleware of this.middlewares) {
processedData = await middleware.process(processedData, sourceId);
}
// 应用处理器
for (const processor of this.processors) {
if (processor.canProcess(sourceId)) {
processedData = await processor.process(processedData);
}
}
return processedData;
}
}
// 数据清洗处理器
class DataCleaningProcessor {
constructor(config = {}) {
this.config = {
removeNulls: true,
removeDuplicates: true,
handleOutliers: true,
fillMissingValues: true,
normalizeStrings: true,
validateTypes: true,
...config
};
}
canProcess(sourceId) {
return true; // 适用于所有数据源
}
async process(data) {
let cleanedData = Array.isArray(data) ? [...data] : { ...data };
if (Array.isArray(cleanedData)) {
// 数组数据清洗
cleanedData = await this.cleanArrayData(cleanedData);
} else {
// 对象数据清洗
cleanedData = await this.cleanObjectData(cleanedData);
}
return cleanedData;
}
// 清洗数组数据
async cleanArrayData(data) {
let cleaned = data;
// 移除空值
if (this.config.removeNulls) {
cleaned = cleaned.filter(item => item != null);
}
// 去重
if (this.config.removeDuplicates) {
cleaned = this.removeDuplicates(cleaned);
}
// 处理异常值
if (this.config.handleOutliers) {
cleaned = this.handleOutliers(cleaned);
}
// 填充缺失值
if (this.config.fillMissingValues) {
cleaned = this.fillMissingValues(cleaned);
}
// 数据类型验证
if (this.config.validateTypes) {
cleaned = this.validateDataTypes(cleaned);
}
return cleaned;
}
// 清洗对象数据
async cleanObjectData(data) {
const cleaned = {};
for (const [key, value] of Object.entries(data)) {
// 跳过空值
if (this.config.removeNulls && value == null) {
continue;
}
// 字符串标准化
if (this.config.normalizeStrings && typeof value === 'string') {
cleaned[key] = this.normalizeString(value);
} else {
cleaned[key] = value;
}
}
return cleaned;
}
// 去重
removeDuplicates(data) {
const seen = new Set();
return data.filter(item => {
const key = JSON.stringify(item);
if (seen.has(key)) {
return false;
}
seen.add(key);
return true;
});
}
// 处理异常值
handleOutliers(data) {
if (!Array.isArray(data) || data.length === 0) return data;
// 假设数据是数值数组或包含数值字段的对象数组
const numericFields = this.getNumericFields(data[0]);
return data.filter(item => {
return numericFields.every(field => {
const value = typeof item === 'object' ? item[field] : item;
return !this.isOutlier(value, data, field);
});
});
}
// 检测异常值(使用IQR方法)
isOutlier(value, data, field) {
const values = data.map(item =>
typeof item === 'object' ? item[field] : item
).filter(v => typeof v === 'number').sort((a, b) => a - b);
if (values.length < 4) return false;
const q1Index = Math.floor(values.length * 0.25);
const q3Index = Math.floor(values.length * 0.75);
const q1 = values[q1Index];
const q3 = values[q3Index];
const iqr = q3 - q1;
const lowerBound = q1 - 1.5 * iqr;
const upperBound = q3 + 1.5 * iqr;
return value < lowerBound || value > upperBound;
}
// 填充缺失值
fillMissingValues(data) {
if (!Array.isArray(data) || data.length === 0) return data;
const numericFields = this.getNumericFields(data[0]);
const stringFields = this.getStringFields(data[0]);
// 计算数值字段的平均值
const numericMeans = {};
numericFields.forEach(field => {
const values = data
.map(item => item[field])
.filter(v => typeof v === 'number');
numericMeans[field] = values.length > 0 ?
values.reduce((sum, v) => sum + v, 0) / values.length : 0;
});
// 计算字符串字段的众数
const stringModes = {};
stringFields.forEach(field => {
const values = data
.map(item => item[field])
.filter(v => typeof v === 'string');
stringModes[field] = this.getMode(values) || 'Unknown';
});
return data.map(item => {
const filled = { ...item };
numericFields.forEach(field => {
if (filled[field] == null) {
filled[field] = numericMeans[field];
}
});
stringFields.forEach(field => {
if (filled[field] == null || filled[field] === '') {
filled[field] = stringModes[field];
}
});
return filled;
});
}
// 获取数值字段
getNumericFields(sample) {
if (typeof sample !== 'object') return [];
return Object.keys(sample).filter(key =>
typeof sample[key] === 'number'
);
}
// 获取字符串字段
getStringFields(sample) {
if (typeof sample !== 'object') return [];
return Object.keys(sample).filter(key =>
typeof sample[key] === 'string'
);
}
// 获取众数
getMode(values) {
const frequency = {};
let maxCount = 0;
let mode = null;
values.forEach(value => {
frequency[value] = (frequency[value] || 0) + 1;
if (frequency[value] > maxCount) {
maxCount = frequency[value];
mode = value;
}
});
return mode;
}
// 字符串标准化
normalizeString(str) {
return str.trim().toLowerCase().replace(/\s+/g, ' ');
}
// 数据类型验证
validateDataTypes(data) {
return data.filter(item => {
try {
// 基本类型检查
if (typeof item === 'object' && item !== null) {
// 检查对象是否有效
return Object.keys(item).length > 0;
}
return true;
} catch (error) {
return false;
}
});
}
}
// 数据格式转换器
class DataFormatConverter {
constructor() {
this.converters = new Map();
this.initConverters();
}
// 初始化转换器
initConverters() {
// CSV转换器
this.converters.set('csv', {
parse: this.parseCSV.bind(this),
stringify: this.stringifyCSV.bind(this)
});
// JSON转换器
this.converters.set('json', {
parse: this.parseJSON.bind(this),
stringify: this.stringifyJSON.bind(this)
});
// XML转换器
this.converters.set('xml', {
parse: this.parseXML.bind(this),
stringify: this.stringifyXML.bind(this)
});
}
// 转换数据格式
convert(data, fromFormat, toFormat) {
const fromConverter = this.converters.get(fromFormat);
const toConverter = this.converters.get(toFormat);
if (!fromConverter || !toConverter) {
throw new Error(`Unsupported format conversion: ${fromFormat} -> ${toFormat}`);
}
// 解析源格式
const parsedData = fromConverter.parse(data);
// 转换为目标格式
return toConverter.stringify(parsedData);
}
// CSV解析
parseCSV(csvText) {
const lines = csvText.split('\n').filter(line => line.trim());
if (lines.length === 0) return [];
const headers = this.parseCSVLine(lines[0]);
const data = [];
for (let i = 1; i < lines.length; i++) {
const values = this.parseCSVLine(lines[i]);
const row = {};
headers.forEach((header, index) => {
row[header] = this.convertValue(values[index]);
});
data.push(row);
}
return data;
}
// CSV行解析
parseCSVLine(line) {
const result = [];
let current = '';
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
if (char === '"') {
inQuotes = !inQuotes;
} else if (char === ',' && !inQuotes) {
result.push(current.trim());
current = '';
} else {
current += char;
}
}
result.push(current.trim());
return result;
}
// 值类型转换
convertValue(value) {
if (!value || value === '') return null;
// 移除引号
value = value.replace(/^"(.*)"$/, '$1');
// 尝试转换为数字
if (/^-?\d+\.?\d*$/.test(value)) {
return parseFloat(value);
}
// 尝试转换为布尔值
if (value.toLowerCase() === 'true') return true;
if (value.toLowerCase() === 'false') return false;
return value;
}
// JSON解析
parseJSON(jsonText) {
try {
return JSON.parse(jsonText);
} catch (error) {
throw new Error('Invalid JSON format');
}
}
// JSON字符串化
stringifyJSON(data) {
return JSON.stringify(data, null, 2);
}
// CSV字符串化
stringifyCSV(data) {
if (!Array.isArray(data) || data.length === 0) return '';
const headers = Object.keys(data[0]);
const csvLines = [headers.join(',')];
data.forEach(row => {
const values = headers.map(header => {
let value = row[header];
if (value == null) value = '';
if (typeof value === 'string' && (value.includes(',') || value.includes('"'))) {
value = `"${value.replace(/"/g, '""')}"`;
}
return value;
});
csvLines.push(values.join(','));
});
return csvLines.join('\n');
}
// XML解析(简化版)
parseXML(xmlText) {
const parser = new DOMParser();
const xmlDoc = parser.parseFromString(xmlText, 'text/xml');
if (xmlDoc.getElementsByTagName('parsererror').length > 0) {
throw new Error('Invalid XML format');
}
return this.xmlToObject(xmlDoc.documentElement);
}
// XML转对象
xmlToObject(xmlNode) {
const result = {};
// 处理属性
if (xmlNode.attributes) {
for (let i = 0; i < xmlNode.attributes.length; i++) {
const attr = xmlNode.attributes[i];
result[`@${attr.name}`] = attr.value;
}
}
// 处理子节点
if (xmlNode.childNodes) {
for (let i = 0; i < xmlNode.childNodes.length; i++) {
const child = xmlNode.childNodes[i];
if (child.nodeType === Node.TEXT_NODE) {
const text = child.textContent.trim();
if (text) {
result['#text'] = text;
}
} else if (child.nodeType === Node.ELEMENT_NODE) {
const childObj = this.xmlToObject(child);
if (result[child.nodeName]) {
if (!Array.isArray(result[child.nodeName])) {
result[child.nodeName] = [result[child.nodeName]];
}
result[child.nodeName].push(childObj);
} else {
result[child.nodeName] = childObj;
}
}
}
}
return result;
}
// XML字符串化(简化版)
stringifyXML(data) {
return this.objectToXML(data, 'root');
}
// 对象转XML
objectToXML(obj, rootName) {
let xml = `<${rootName}>`;
for (const [key, value] of Object.entries(obj)) {
if (key.startsWith('@')) continue; // 跳过属性
if (Array.isArray(value)) {
value.forEach(item => {
xml += this.objectToXML(item, key);
});
} else if (typeof value === 'object' && value !== null) {
xml += this.objectToXML(value, key);
} else {
xml += `<${key}>${value}</${key}>`;
}
}
xml += `</${rootName}>`;
return xml;
}
}实时数据更新机制通过WebSocket、Server-Sent Events等技术实现数据的实时推送和同步:
// 实时数据管理器
class RealTimeDataManager {
constructor() {
this.connections = new Map();
this.subscriptions = new Map();
this.reconnectAttempts = new Map();
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.config = {
heartbeatInterval: 30000,
connectionTimeout: 10000,
maxMessageSize: 1024 * 1024, // 1MB
enableCompression: true
};
}
// 创建WebSocket连接
createWebSocketConnection(url, protocols = []) {
const connectionId = this.generateConnectionId();
const ws = new WebSocket(url, protocols);
const connection = {
id: connectionId,
type: 'websocket',
socket: ws,
url: url,
status: 'connecting',
lastHeartbeat: Date.now(),
subscriptions: new Set()
};
this.setupWebSocketHandlers(connection);
this.connections.set(connectionId, connection);
return connectionId;
}
// 设置WebSocket事件处理
setupWebSocketHandlers(connection) {
const ws = connection.socket;
ws.onopen = () => {
connection.status = 'connected';
this.resetReconnectAttempts(connection.id);
this.startHeartbeat(connection.id);
console.log(`WebSocket connected: ${connection.url}`);
this.notifyConnectionStatus(connection.id, 'connected');
};
ws.onmessage = (event) => {
this.handleMessage(connection.id, event.data);
};
ws.onclose = (event) => {
connection.status = 'disconnected';
this.stopHeartbeat(connection.id);
console.log(`WebSocket disconnected: ${connection.url}`, event);
this.notifyConnectionStatus(connection.id, 'disconnected');
// 自动重连
if (!event.wasClean) {
this.attemptReconnect(connection.id);
}
};
ws.onerror = (error) => {
console.error(`WebSocket error: ${connection.url}`, error);
this.notifyConnectionStatus(connection.id, 'error', error);
};
}
// 创建SSE连接
createSSEConnection(url) {
const connectionId = this.generateConnectionId();
const eventSource = new EventSource(url);
const connection = {
id: connectionId,
type: 'sse',
eventSource: eventSource,
url: url,
status: 'connecting',
subscriptions: new Set()
};
this.setupSSEHandlers(connection);
this.connections.set(connectionId, connection);
return connectionId;
}
// 设置SSE事件处理
setupSSEHandlers(connection) {
const es = connection.eventSource;
es.onopen = () => {
connection.status = 'connected';
this.resetReconnectAttempts(connection.id);
console.log(`SSE connected: ${connection.url}`);
this.notifyConnectionStatus(connection.id, 'connected');
};
es.onmessage = (event) => {
this.handleMessage(connection.id, event.data);
};
es.onerror = (error) => {
connection.status = 'error';
console.error(`SSE error: ${connection.url}`, error);
this.notifyConnectionStatus(connection.id, 'error', error);
// SSE会自动重连,但我们需要处理连接状态
setTimeout(() => {
if (es.readyState === EventSource.CONNECTING) {
connection.status = 'connecting';
}
}, 1000);
};
}
// 处理消息
handleMessage(connectionId, data) {
try {
const message = JSON.parse(data);
const connection = this.connections.get(connectionId);
if (!connection) return;
// 更新心跳时间
connection.lastHeartbeat = Date.now();
// 处理不同类型的消息
switch (message.type) {
case 'data':
this.handleDataMessage(connectionId, message);
break;
case 'heartbeat':
this.handleHeartbeat(connectionId, message);
break;
case 'subscription':
this.handleSubscriptionMessage(connectionId, message);
break;
default:
console.warn('Unknown message type:', message.type);
}
} catch (error) {
console.error('Failed to parse message:', error);
}
}
// 处理数据消息
handleDataMessage(connectionId, message) {
const { channel, data, timestamp } = message;
// 通知订阅者
const subscriptionKey = `${connectionId}:${channel}`;
const subscription = this.subscriptions.get(subscriptionKey);
if (subscription && subscription.callback) {
try {
subscription.callback(data, timestamp);
} catch (error) {
console.error('Subscription callback error:', error);
}
}
}
// 订阅数据频道
subscribe(connectionId, channel, callback) {
const connection = this.connections.get(connectionId);
if (!connection) {
throw new Error('Connection not found');
}
const subscriptionKey = `${connectionId}:${channel}`;
const subscription = {
connectionId,
channel,
callback,
createdAt: Date.now()
};
this.subscriptions.set(subscriptionKey, subscription);
connection.subscriptions.add(channel);
// 发送订阅请求
this.sendSubscriptionRequest(connectionId, channel, 'subscribe');
// 返回取消订阅函数
return () => {
this.unsubscribe(connectionId, channel);
};
}
// 取消订阅
unsubscribe(connectionId, channel) {
const subscriptionKey = `${connectionId}:${channel}`;
const connection = this.connections.get(connectionId);
if (connection) {
connection.subscriptions.delete(channel);
this.sendSubscriptionRequest(connectionId, channel, 'unsubscribe');
}
this.subscriptions.delete(subscriptionKey);
}
// 发送订阅请求
sendSubscriptionRequest(connectionId, channel, action) {
const connection = this.connections.get(connectionId);
if (!connection || connection.status !== 'connected') return;
const message = {
type: 'subscription',
action: action,
channel: channel,
timestamp: Date.now()
};
this.sendMessage(connectionId, message);
}
// 发送消息
sendMessage(connectionId, message) {
const connection = this.connections.get(connectionId);
if (!connection || connection.status !== 'connected') return false;
try {
const messageStr = JSON.stringify(message);
if (connection.type === 'websocket') {
connection.socket.send(messageStr);
}
// SSE是单向的,不能发送消息
return true;
} catch (error) {
console.error('Failed to send message:', error);
return false;
}
}
// 开始心跳
startHeartbeat(connectionId) {
const heartbeatId = setInterval(() => {
const connection = this.connections.get(connectionId);
if (!connection || connection.status !== 'connected') {
clearInterval(heartbeatId);
return;
}
// 检查心跳超时
const timeSinceLastHeartbeat = Date.now() - connection.lastHeartbeat;
if (timeSinceLastHeartbeat > this.config.heartbeatInterval * 2) {
console.warn('Heartbeat timeout, attempting reconnect');
this.attemptReconnect(connectionId);
clearInterval(heartbeatId);
return;
}
// 发送心跳
this.sendMessage(connectionId, {
type: 'heartbeat',
timestamp: Date.now()
});
}, this.config.heartbeatInterval);
const connection = this.connections.get(connectionId);
if (connection) {
connection.heartbeatId = heartbeatId;
}
}
// 停止心跳
stopHeartbeat(connectionId) {
const connection = this.connections.get(connectionId);
if (connection && connection.heartbeatId) {
clearInterval(connection.heartbeatId);
delete connection.heartbeatId;
}
}
// 尝试重连
async attemptReconnect(connectionId) {
const connection = this.connections.get(connectionId);
if (!connection) return;
const attempts = this.reconnectAttempts.get(connectionId) || 0;
if (attempts >= this.maxReconnectAttempts) {
console.error('Max reconnect attempts reached');
this.notifyConnectionStatus(connectionId, 'failed');
return;
}
this.reconnectAttempts.set(connectionId, attempts + 1);
// 延迟重连
const delay = this.reconnectDelay * Math.pow(2, attempts);
await new Promise(resolve => setTimeout(resolve, delay));
console.log(`Attempting reconnect ${attempts + 1}/${this.maxReconnectAttempts}`);
try {
if (connection.type === 'websocket') {
// 重新创建WebSocket连接
const newWs = new WebSocket(connection.url);
connection.socket = newWs;
this.setupWebSocketHandlers(connection);
} else if (connection.type === 'sse') {
// 重新创建SSE连接
connection.eventSource.close();
const newEs = new EventSource(connection.url);
connection.eventSource = newEs;
this.setupSSEHandlers(connection);
}
} catch (error) {
console.error('Reconnect failed:', error);
setTimeout(() => this.attemptReconnect(connectionId), delay);
}
}
// 重置重连尝试次数
resetReconnectAttempts(connectionId) {
this.reconnectAttempts.delete(connectionId);
}
// 通知连接状态变化
notifyConnectionStatus(connectionId, status, error = null) {
const event = new CustomEvent('connection-status-change', {
detail: { connectionId, status, error }
});
document.dispatchEvent(event);
}
// 关闭连接
closeConnection(connectionId) {
const connection = this.connections.get(connectionId);
if (!connection) return;
// 停止心跳
this.stopHeartbeat(connectionId);
// 关闭连接
if (connection.type === 'websocket') {
connection.socket.close();
} else if (connection.type === 'sse') {
connection.eventSource.close();
}
// 清理订阅
connection.subscriptions.forEach(channel => {
const subscriptionKey = `${connectionId}:${channel}`;
this.subscriptions.delete(subscriptionKey);
});
// 移除连接
this.connections.delete(connectionId);
this.reconnectAttempts.delete(connectionId);
}
// 生成连接ID
generateConnectionId() {
return 'conn_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
}
}实时数据更新的实际应用:
💼 性能考虑:实时数据更新需要平衡数据新鲜度和系统性能,避免过度频繁的更新影响用户体验
通过本节JavaScript数据可视化数据处理的学习,你已经掌握:
A: 使用分页和虚拟滚动减少DOM渲染压力、实现数据分片和流式处理、使用Web Workers进行后台数据处理、建立多级缓存策略、采用数据压缩和增量更新技术。
A: 根据业务需求设置合适的更新频率、实现自适应更新策略、使用防抖和节流技术控制更新频率、监控网络状况动态调整、提供用户自定义更新频率选项。
A: 建立数据质量评估指标、实现多种清洗算法的交叉验证、提供数据清洗结果的可视化展示、支持用户手动审核和修正、建立数据清洗的回滚机制。
A: 设计统一的数据模型和接口规范、实现灵活的数据映射和转换规则、建立数据源适配器模式、提供数据格式检测和自动转换、支持自定义数据转换脚本。
A: 实现自动重连机制和指数退避算法、建立连接状态监控和通知、实现离线数据缓存和同步、提供连接质量指示器、支持多种实时通信协议的降级策略。
"掌握专业的数据处理技术,是构建高质量数据可视化应用的基础能力。通过系统学习数据获取、清洗、转换和实时更新技术,你将具备处理复杂数据场景的专业技能!"