// UDP Discovery service for SPORE nodes const dgram = require('dgram'); const EventEmitter = require('events'); const os = require('os'); class UdpDiscovery extends EventEmitter { constructor(port = 4210) { super(); this.port = port; this.socket = null; this.nodes = new Map(); // ip -> { lastSeen, status } this.discoveryInterval = null; this.isRunning = false; // Get local network interfaces to filter out local server this.localInterfaces = this.getLocalInterfaces(); } getLocalInterfaces() { const interfaces = os.networkInterfaces(); const localIPs = new Set(); Object.values(interfaces).forEach(iface => { iface.forEach(addr => { if (addr.family === 'IPv4' && !addr.internal) { localIPs.add(addr.address); } }); }); return localIPs; } start() { if (this.isRunning) { return; } this.socket = dgram.createSocket('udp4'); this.isRunning = true; this.socket.on('message', (msg, rinfo) => { this.handleMessage(msg, rinfo); }); this.socket.on('error', (err) => { console.error('UDP Discovery socket error:', err); this.emit('error', err); }); this.socket.bind(this.port, () => { console.log(`UDP Discovery listening on port ${this.port}`); // Enable broadcast after binding this.socket.setBroadcast(true); this.emit('started'); }); // Start periodic discovery broadcast this.startDiscoveryBroadcast(); } stop() { if (!this.isRunning) { return; } this.isRunning = false; if (this.discoveryInterval) { clearInterval(this.discoveryInterval); this.discoveryInterval = null; } if (this.socket) { this.socket.close(); this.socket = null; } this.nodes.clear(); console.log('UDP Discovery stopped'); this.emit('stopped'); } handleMessage(msg, rinfo) { const message = msg.toString('utf8'); const nodeIp = rinfo.address; // Skip local server IPs if (this.localInterfaces.has(nodeIp)) { return; } // Handle different message types if (message.startsWith('cluster/heartbeat:')) { // Extract hostname from heartbeat: "cluster/heartbeat:hostname" const hostname = message.substring('cluster/heartbeat:'.length); this.handleHeartbeat(hostname, nodeIp, rinfo.port); } else if (message.startsWith('node/update:')) { // Extract hostname and JSON from update: "node/update:hostname:{json}" const parts = message.substring('node/update:'.length).split(':'); if (parts.length >= 2) { const hostname = parts[0]; const jsonStr = parts.slice(1).join(':'); // Rejoin in case JSON contains colons this.handleNodeUpdate(hostname, jsonStr, nodeIp, rinfo.port); } } } handleHeartbeat(hostname, nodeIp, port) { console.log(`Heartbeat from ${hostname} @ ${nodeIp}`); // Update or add node const existingNode = this.nodes.get(nodeIp); this.nodes.set(nodeIp, { lastSeen: Date.now(), status: 'connected', address: nodeIp, port: port, hostname: hostname }); // Only emit if this is a new node or if we need to update if (!existingNode || existingNode.hostname !== hostname) { this.emit('nodeDiscovered', { ip: nodeIp, hostname: hostname, port: port, status: 'connected' }); } // Clean up stale nodes periodically this.cleanupStaleNodes(); } handleNodeUpdate(hostname, jsonStr, nodeIp, port) { console.log(`Node update from ${hostname} @ ${nodeIp}`); // Try to parse JSON to extract additional info let nodeInfo = {}; try { nodeInfo = JSON.parse(jsonStr); } catch (e) { console.warn(`Failed to parse node update JSON: ${e.message}`); } // Update node with hostname and any additional info const existingNode = this.nodes.get(nodeIp); this.nodes.set(nodeIp, { lastSeen: Date.now(), status: 'connected', address: nodeIp, port: port, hostname: hostname || nodeInfo.hostname || existingNode?.hostname, ...nodeInfo }); // Emit update event this.emit('nodeDiscovered', { ip: nodeIp, hostname: hostname || nodeInfo.hostname || existingNode?.hostname, port: port, status: 'connected' }); } startDiscoveryBroadcast() { // With the new protocol, SPORE nodes automatically broadcast heartbeats // LEDLab passively listens for these heartbeats, so we don't need to broadcast. // However, we can optionally send a heartbeat to prompt nodes to respond faster. // For now, we just listen for incoming heartbeats from nodes. // Optional: send initial heartbeat to prompt nodes to announce themselves this.broadcastHeartbeat(); // Send periodic heartbeats to prompt node announcements (every 10 seconds) this.discoveryInterval = setInterval(() => { this.broadcastHeartbeat(); }, 10000); } broadcastHeartbeat() { if (!this.socket) { return; } // Send heartbeat using the new protocol format: "cluster/heartbeat:hostname" const hostname = 'ledlab-client'; const discoveryMessage = `cluster/heartbeat:${hostname}`; const message = Buffer.from(discoveryMessage, 'utf8'); this.socket.send(message, 0, message.length, this.port, '255.255.255.255', (err) => { if (err) { console.error('Error broadcasting heartbeat:', err); } else { console.log('Discovery heartbeat broadcasted'); } }); } cleanupStaleNodes() { const now = Date.now(); const staleThreshold = 10000; // 10 seconds for (const [ip, node] of this.nodes.entries()) { if (now - node.lastSeen > staleThreshold) { this.nodes.delete(ip); this.emit('nodeLost', { ip, status: 'disconnected' }); } } } getNodes() { const nodes = Array.from(this.nodes.entries()).map(([ip, node]) => ({ ip, hostname: node.hostname || ip, ...node })); return nodes; } getNodeCount() { return this.nodes.size; } sendToNode(nodeIp, message) { if (!this.socket) { return false; } const buffer = Buffer.from(message, 'utf8'); this.socket.send(buffer, 0, buffer.length, this.port, nodeIp, (err) => { if (err) { console.error(`Error sending to node ${nodeIp}:`, err); return false; } return true; }); return true; } broadcastToAll(message) { if (!this.socket) { return false; } const buffer = Buffer.from(message, 'utf8'); this.socket.setBroadcast(true); this.socket.send(buffer, 0, buffer.length, this.port, '255.255.255.255', (err) => { if (err) { console.error('Error broadcasting message:', err); return false; } return true; }); return true; } } module.exports = UdpDiscovery;