feat: subscribe to websocket for cluster updates

This commit is contained in:
2025-10-27 13:05:35 +01:00
parent 858be416eb
commit 7784365361
2 changed files with 130 additions and 88 deletions

View File

@@ -23,7 +23,7 @@ LEDLab is a tool for streaming animations to LED matrices connected to SPORE nod
The Node.js server provides the backend for SPORE LEDLab: The Node.js server provides the backend for SPORE LEDLab:
- **Gateway Integration**: Queries spore-gateway for node discovery (requires spore-gateway to be running) - **Gateway Integration**: Subscribes to spore-gateway WebSocket for real-time cluster updates (requires spore-gateway to be running)
- **UDP Frame Streaming**: Sends animation frames to SPORE nodes via UDP on port 4210 - **UDP Frame Streaming**: Sends animation frames to SPORE nodes via UDP on port 4210
- **WebSocket API**: Real-time bidirectional communication with the web UI - **WebSocket API**: Real-time bidirectional communication with the web UI
- **Preset Management**: Manages animation presets with configurable parameters - **Preset Management**: Manages animation presets with configurable parameters
@@ -51,7 +51,7 @@ Web UI (Browser) <--WebSocket--> Server <--UDP--> SPORE Nodes
Preset Engine Preset Engine
| |
Frame Generation (60fps) Frame Generation (60fps)
Gateway API (Discovery) Gateway WebSocket (Real-time cluster updates)
``` ```
## Build ## Build

View File

@@ -1,15 +1,18 @@
// Gateway Client - Communicates with spore-gateway for node discovery // Gateway Client - Communicates with spore-gateway for node discovery
const http = require('http'); const WebSocket = require('ws');
class GatewayClient { class GatewayClient {
constructor(options = {}) { constructor(options = {}) {
this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001'; this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001';
this.pollInterval = options.pollInterval || 2000; // Poll every 2 seconds
this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable
this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port } this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port }
this.isRunning = false; this.isRunning = false;
this.pollTimer = null; this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.reconnectDelay = 2000;
this.reconnectTimer = null;
} }
start() { start() {
@@ -20,13 +23,7 @@ class GatewayClient {
this.isRunning = true; this.isRunning = true;
console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`); console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`);
// Initial fetch this.connectWebSocket();
this.fetchNodes();
// Start polling
this.pollTimer = setInterval(() => {
this.fetchNodes();
}, this.pollInterval);
} }
stop() { stop() {
@@ -36,100 +33,143 @@ class GatewayClient {
this.isRunning = false; this.isRunning = false;
if (this.pollTimer) { if (this.reconnectTimer) {
clearInterval(this.pollTimer); clearTimeout(this.reconnectTimer);
this.pollTimer = null; this.reconnectTimer = null;
}
if (this.ws) {
this.ws.close();
this.ws = null;
} }
this.nodes.clear(); this.nodes.clear();
console.log('Gateway client stopped'); console.log('Gateway client stopped');
} }
async fetchNodes() { connectWebSocket() {
try { try {
const response = await this.httpGet(`${this.gatewayUrl}/api/discovery/nodes`); // Convert http:// to ws:// for WebSocket
const data = JSON.parse(response); const wsUrl = this.gatewayUrl.replace('http://', 'ws://').replace('https://', 'wss://') + '/ws';
console.log(`Connecting to WebSocket: ${wsUrl}`);
// Update nodes from gateway response this.ws = new WebSocket(wsUrl);
const newNodes = new Map();
let totalNodes = 0;
let filteredNodes = 0;
if (data.nodes && Array.isArray(data.nodes)) { this.ws.on('open', () => {
totalNodes = data.nodes.length; console.log('WebSocket connected to gateway');
data.nodes.forEach(node => { this.reconnectAttempts = 0;
// Filter for nodes with specified app label (if filtering is enabled) });
if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) {
filteredNodes++; this.ws.on('message', (data) => {
return; try {
} const message = JSON.parse(data.toString());
this.handleWebSocketMessage(message);
const nodeIp = node.ip; } catch (error) {
newNodes.set(nodeIp, { console.error('Error parsing WebSocket message:', error);
lastSeen: Date.now(),
status: node.status || 'active',
hostname: node.hostname || nodeIp,
port: node.port || 4210,
isPrimary: node.isPrimary || false
});
});
//if (totalNodes > 0 && filteredNodes > 0 && this.filterAppLabel) {
// console.loh(`Filtered ${filteredNodes} nodes without app: ${this.filterAppLabel} label (${newNodes.size} ${this.filterAppLabel} nodes active)`);
//}
}
// Check for newly discovered nodes
for (const [ip, nodeInfo] of newNodes.entries()) {
const existingNode = this.nodes.get(ip);
if (!existingNode) {
console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`);
this.nodes.set(ip, nodeInfo);
// Could emit an event here if needed: this.emit('nodeDiscovered', nodeInfo);
} else if (existingNode.hostname !== nodeInfo.hostname) {
console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`);
this.nodes.set(ip, nodeInfo);
} }
} });
// Check for lost nodes this.ws.on('close', (code, reason) => {
for (const ip of this.nodes.keys()) { console.log(`WebSocket connection closed: ${code} ${reason}`);
if (!newNodes.has(ip)) { if (this.isRunning) {
console.log(`Node lost via gateway: ${ip}`); this.scheduleReconnect();
this.nodes.delete(ip);
// Could emit an event here if needed: this.emit('nodeLost', { ip });
} }
} });
this.ws.on('error', (error) => {
console.error('WebSocket error:', error.message);
});
} catch (error) { } catch (error) {
console.error('Error fetching nodes from gateway:', error.message); console.error('Failed to create WebSocket connection:', error);
this.scheduleReconnect();
} }
} }
httpGet(url) { handleWebSocketMessage(message) {
return new Promise((resolve, reject) => { if (message.topic === 'cluster/update') {
http.get(url, (res) => { this.handleClusterUpdate(message.members, message.primaryNode, message.totalNodes);
let data = ''; } else if (message.topic === 'node/discovery') {
this.handleNodeDiscovery(message.action, message.nodeIp);
}
}
handleClusterUpdate(members, primaryNode, totalNodes) {
const newNodes = new Map();
if (members && Array.isArray(members)) {
members.forEach(node => {
// Filter for nodes with specified app label (if filtering is enabled)
if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) {
return;
}
res.on('data', (chunk) => { const nodeIp = node.ip || node.IP;
data += chunk; newNodes.set(nodeIp, {
lastSeen: Date.now(),
status: node.status || node.Status || 'active',
hostname: node.hostname || node.Hostname || nodeIp,
port: node.port || node.Port || 4210,
isPrimary: (primaryNode === nodeIp),
labels: node.labels || node.Labels
}); });
res.on('end', () => {
if (res.statusCode === 200) {
resolve(data);
} else {
reject(new Error(`HTTP ${res.statusCode}: ${data}`));
}
});
res.on('error', (err) => {
reject(err);
});
}).on('error', (err) => {
reject(err);
}); });
}); }
// Check for newly discovered nodes
for (const [ip, nodeInfo] of newNodes.entries()) {
const existingNode = this.nodes.get(ip);
if (!existingNode) {
console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`);
this.nodes.set(ip, nodeInfo);
} else if (existingNode.hostname !== nodeInfo.hostname) {
console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`);
this.nodes.set(ip, nodeInfo);
} else {
// Update status and last seen
existingNode.status = nodeInfo.status;
existingNode.lastSeen = nodeInfo.lastSeen;
existingNode.isPrimary = nodeInfo.isPrimary;
}
}
// Check for lost nodes
for (const ip of this.nodes.keys()) {
if (!newNodes.has(ip)) {
console.log(`Node lost via gateway: ${ip}`);
this.nodes.delete(ip);
}
}
}
handleNodeDiscovery(action, nodeIp) {
if (action === 'discovered') {
// Node was discovered - actual data will come via cluster/update
console.log(`Node discovered event: ${nodeIp}`);
} else if (action === 'removed') {
console.log(`Node removed event: ${nodeIp}`);
this.nodes.delete(nodeIp);
}
}
scheduleReconnect() {
if (!this.isRunning) {
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max WebSocket reconnection attempts reached');
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Attempting to reconnect WebSocket in ${delay}ms (attempt ${this.reconnectAttempts})`);
this.reconnectTimer = setTimeout(() => {
this.connectWebSocket();
}, delay);
} }
getNodes() { getNodes() {
@@ -138,6 +178,7 @@ class GatewayClient {
hostname: node.hostname || ip, hostname: node.hostname || ip,
port: node.port, port: node.port,
status: node.status, status: node.status,
isPrimary: node.isPrimary,
...node ...node
})); }));
} }
@@ -158,3 +199,4 @@ class GatewayClient {
module.exports = GatewayClient; module.exports = GatewayClient;