Files
spore-ledlab/server/gateway-client.js

203 lines
5.6 KiB
JavaScript

// Gateway Client - Communicates with spore-gateway for node discovery
const WebSocket = require('ws');
class GatewayClient {
constructor(options = {}) {
this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001';
this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable
this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port }
this.isRunning = false;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.reconnectDelay = 2000;
this.reconnectTimer = null;
}
start() {
if (this.isRunning) {
return;
}
this.isRunning = true;
console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`);
this.connectWebSocket();
}
stop() {
if (!this.isRunning) {
return;
}
this.isRunning = false;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.nodes.clear();
console.log('Gateway client stopped');
}
connectWebSocket() {
try {
// Convert http:// to ws:// for WebSocket
const wsUrl = this.gatewayUrl.replace('http://', 'ws://').replace('https://', 'wss://') + '/ws';
console.log(`Connecting to WebSocket: ${wsUrl}`);
this.ws = new WebSocket(wsUrl);
this.ws.on('open', () => {
console.log('WebSocket connected to gateway');
this.reconnectAttempts = 0;
});
this.ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleWebSocketMessage(message);
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
});
this.ws.on('close', (code, reason) => {
console.log(`WebSocket connection closed: ${code} ${reason}`);
if (this.isRunning) {
this.scheduleReconnect();
}
});
this.ws.on('error', (error) => {
console.error('WebSocket error:', error.message);
});
} catch (error) {
console.error('Failed to create WebSocket connection:', error);
this.scheduleReconnect();
}
}
handleWebSocketMessage(message) {
if (message.topic === 'cluster/update') {
this.handleClusterUpdate(message.members, message.primaryNode, message.totalNodes);
} else if (message.topic === 'node/discovery') {
this.handleNodeDiscovery(message.action, message.nodeIp);
}
}
handleClusterUpdate(members, primaryNode, totalNodes) {
const newNodes = new Map();
if (members && Array.isArray(members)) {
members.forEach(node => {
// Filter for nodes with specified app label (if filtering is enabled)
if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) {
return;
}
const nodeIp = node.ip || node.IP;
newNodes.set(nodeIp, {
lastSeen: Date.now(),
status: node.status || node.Status || 'active',
hostname: node.hostname || node.Hostname || nodeIp,
port: node.port || node.Port || 4210,
isPrimary: (primaryNode === nodeIp),
labels: node.labels || node.Labels
});
});
}
// Check for newly discovered nodes
for (const [ip, nodeInfo] of newNodes.entries()) {
const existingNode = this.nodes.get(ip);
if (!existingNode) {
console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`);
this.nodes.set(ip, nodeInfo);
} else if (existingNode.hostname !== nodeInfo.hostname) {
console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`);
this.nodes.set(ip, nodeInfo);
} else {
// Update status and last seen
existingNode.status = nodeInfo.status;
existingNode.lastSeen = nodeInfo.lastSeen;
existingNode.isPrimary = nodeInfo.isPrimary;
}
}
// Check for lost nodes
for (const ip of this.nodes.keys()) {
if (!newNodes.has(ip)) {
console.log(`Node lost via gateway: ${ip}`);
this.nodes.delete(ip);
}
}
}
handleNodeDiscovery(action, nodeIp) {
if (action === 'discovered') {
// Node was discovered - actual data will come via cluster/update
console.log(`Node discovered event: ${nodeIp}`);
} else if (action === 'removed') {
console.log(`Node removed event: ${nodeIp}`);
this.nodes.delete(nodeIp);
}
}
scheduleReconnect() {
if (!this.isRunning) {
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max WebSocket reconnection attempts reached');
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Attempting to reconnect WebSocket in ${delay}ms (attempt ${this.reconnectAttempts})`);
this.reconnectTimer = setTimeout(() => {
this.connectWebSocket();
}, delay);
}
getNodes() {
return Array.from(this.nodes.entries()).map(([ip, node]) => ({
ip,
hostname: node.hostname || ip,
port: node.port,
status: node.status,
isPrimary: node.isPrimary,
...node
}));
}
getNodeCount() {
return this.nodes.size;
}
hasAppLabel(node, appLabel) {
// Check if node has the app: <appLabel> label
if (!node.labels || typeof node.labels !== 'object') {
return false;
}
return node.labels.app === appLabel;
}
}
module.exports = GatewayClient;