From 778436536141eacfe365e1fc69912d5109414f08 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Mon, 27 Oct 2025 13:05:35 +0100 Subject: [PATCH] feat: subscribe to websocket for cluster updates --- README.md | 4 +- server/gateway-client.js | 214 +++++++++++++++++++++++---------------- 2 files changed, 130 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index 9dd508b..cae0d39 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ LEDLab is a tool for streaming animations to LED matrices connected to SPORE nod The Node.js server provides the backend for SPORE LEDLab: -- **Gateway Integration**: Queries spore-gateway for node discovery (requires spore-gateway to be running) +- **Gateway Integration**: Subscribes to spore-gateway WebSocket for real-time cluster updates (requires spore-gateway to be running) - **UDP Frame Streaming**: Sends animation frames to SPORE nodes via UDP on port 4210 - **WebSocket API**: Real-time bidirectional communication with the web UI - **Preset Management**: Manages animation presets with configurable parameters @@ -51,7 +51,7 @@ Web UI (Browser) <--WebSocket--> Server <--UDP--> SPORE Nodes Preset Engine | Frame Generation (60fps) - Gateway API (Discovery) + Gateway WebSocket (Real-time cluster updates) ``` ## Build diff --git a/server/gateway-client.js b/server/gateway-client.js index 142e0ee..733a6eb 100644 --- a/server/gateway-client.js +++ b/server/gateway-client.js @@ -1,15 +1,18 @@ // Gateway Client - Communicates with spore-gateway for node discovery -const http = require('http'); +const WebSocket = require('ws'); class GatewayClient { constructor(options = {}) { this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001'; - this.pollInterval = options.pollInterval || 2000; // Poll every 2 seconds this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port } this.isRunning = false; - this.pollTimer = null; + this.ws = null; + this.reconnectAttempts = 0; + this.maxReconnectAttempts = 10; + this.reconnectDelay = 2000; + this.reconnectTimer = null; } start() { @@ -20,13 +23,7 @@ class GatewayClient { this.isRunning = true; console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`); - // Initial fetch - this.fetchNodes(); - - // Start polling - this.pollTimer = setInterval(() => { - this.fetchNodes(); - }, this.pollInterval); + this.connectWebSocket(); } stop() { @@ -36,100 +33,143 @@ class GatewayClient { this.isRunning = false; - if (this.pollTimer) { - clearInterval(this.pollTimer); - this.pollTimer = null; + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + if (this.ws) { + this.ws.close(); + this.ws = null; } this.nodes.clear(); console.log('Gateway client stopped'); } - async fetchNodes() { + connectWebSocket() { try { - const response = await this.httpGet(`${this.gatewayUrl}/api/discovery/nodes`); - const data = JSON.parse(response); + // Convert http:// to ws:// for WebSocket + const wsUrl = this.gatewayUrl.replace('http://', 'ws://').replace('https://', 'wss://') + '/ws'; + console.log(`Connecting to WebSocket: ${wsUrl}`); - // Update nodes from gateway response - const newNodes = new Map(); - let totalNodes = 0; - let filteredNodes = 0; + this.ws = new WebSocket(wsUrl); - if (data.nodes && Array.isArray(data.nodes)) { - totalNodes = data.nodes.length; - data.nodes.forEach(node => { - // Filter for nodes with specified app label (if filtering is enabled) - if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) { - filteredNodes++; - return; - } - - const nodeIp = node.ip; - newNodes.set(nodeIp, { - lastSeen: Date.now(), - status: node.status || 'active', - hostname: node.hostname || nodeIp, - port: node.port || 4210, - isPrimary: node.isPrimary || false - }); - }); - - //if (totalNodes > 0 && filteredNodes > 0 && this.filterAppLabel) { - // console.loh(`Filtered ${filteredNodes} nodes without app: ${this.filterAppLabel} label (${newNodes.size} ${this.filterAppLabel} nodes active)`); - //} - } - - // Check for newly discovered nodes - for (const [ip, nodeInfo] of newNodes.entries()) { - const existingNode = this.nodes.get(ip); - if (!existingNode) { - console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`); - this.nodes.set(ip, nodeInfo); - // Could emit an event here if needed: this.emit('nodeDiscovered', nodeInfo); - } else if (existingNode.hostname !== nodeInfo.hostname) { - console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`); - this.nodes.set(ip, nodeInfo); + this.ws.on('open', () => { + console.log('WebSocket connected to gateway'); + this.reconnectAttempts = 0; + }); + + this.ws.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + this.handleWebSocketMessage(message); + } catch (error) { + console.error('Error parsing WebSocket message:', error); } - } - - // Check for lost nodes - for (const ip of this.nodes.keys()) { - if (!newNodes.has(ip)) { - console.log(`Node lost via gateway: ${ip}`); - this.nodes.delete(ip); - // Could emit an event here if needed: this.emit('nodeLost', { ip }); + }); + + this.ws.on('close', (code, reason) => { + console.log(`WebSocket connection closed: ${code} ${reason}`); + if (this.isRunning) { + this.scheduleReconnect(); } - } - + }); + + this.ws.on('error', (error) => { + console.error('WebSocket error:', error.message); + }); + } catch (error) { - console.error('Error fetching nodes from gateway:', error.message); + console.error('Failed to create WebSocket connection:', error); + this.scheduleReconnect(); } } - httpGet(url) { - return new Promise((resolve, reject) => { - http.get(url, (res) => { - let data = ''; + handleWebSocketMessage(message) { + if (message.topic === 'cluster/update') { + this.handleClusterUpdate(message.members, message.primaryNode, message.totalNodes); + } else if (message.topic === 'node/discovery') { + this.handleNodeDiscovery(message.action, message.nodeIp); + } + } + + handleClusterUpdate(members, primaryNode, totalNodes) { + const newNodes = new Map(); + + if (members && Array.isArray(members)) { + members.forEach(node => { + // Filter for nodes with specified app label (if filtering is enabled) + if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) { + return; + } - res.on('data', (chunk) => { - data += chunk; + const nodeIp = node.ip || node.IP; + newNodes.set(nodeIp, { + lastSeen: Date.now(), + status: node.status || node.Status || 'active', + hostname: node.hostname || node.Hostname || nodeIp, + port: node.port || node.Port || 4210, + isPrimary: (primaryNode === nodeIp), + labels: node.labels || node.Labels }); - - res.on('end', () => { - if (res.statusCode === 200) { - resolve(data); - } else { - reject(new Error(`HTTP ${res.statusCode}: ${data}`)); - } - }); - - res.on('error', (err) => { - reject(err); - }); - }).on('error', (err) => { - reject(err); }); - }); + } + + // Check for newly discovered nodes + for (const [ip, nodeInfo] of newNodes.entries()) { + const existingNode = this.nodes.get(ip); + if (!existingNode) { + console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`); + this.nodes.set(ip, nodeInfo); + } else if (existingNode.hostname !== nodeInfo.hostname) { + console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`); + this.nodes.set(ip, nodeInfo); + } else { + // Update status and last seen + existingNode.status = nodeInfo.status; + existingNode.lastSeen = nodeInfo.lastSeen; + existingNode.isPrimary = nodeInfo.isPrimary; + } + } + + // Check for lost nodes + for (const ip of this.nodes.keys()) { + if (!newNodes.has(ip)) { + console.log(`Node lost via gateway: ${ip}`); + this.nodes.delete(ip); + } + } + } + + handleNodeDiscovery(action, nodeIp) { + if (action === 'discovered') { + // Node was discovered - actual data will come via cluster/update + console.log(`Node discovered event: ${nodeIp}`); + } else if (action === 'removed') { + console.log(`Node removed event: ${nodeIp}`); + this.nodes.delete(nodeIp); + } + } + + scheduleReconnect() { + if (!this.isRunning) { + return; + } + + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + console.error('Max WebSocket reconnection attempts reached'); + return; + } + + this.reconnectAttempts++; + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + + console.log(`Attempting to reconnect WebSocket in ${delay}ms (attempt ${this.reconnectAttempts})`); + + this.reconnectTimer = setTimeout(() => { + this.connectWebSocket(); + }, delay); } getNodes() { @@ -138,6 +178,7 @@ class GatewayClient { hostname: node.hostname || ip, port: node.port, status: node.status, + isPrimary: node.isPrimary, ...node })); } @@ -158,3 +199,4 @@ class GatewayClient { module.exports = GatewayClient; +