// UDP Discovery service for SPORE nodes const dgram = require('dgram'); const EventEmitter = require('events'); const os = require('os'); class UdpDiscovery extends EventEmitter { constructor(port = 4210) { super(); this.port = port; this.socket = null; this.nodes = new Map(); // ip -> { lastSeen, status } this.discoveryInterval = null; this.isRunning = false; // Get local network interfaces to filter out local server this.localInterfaces = this.getLocalInterfaces(); } getLocalInterfaces() { const interfaces = os.networkInterfaces(); const localIPs = new Set(); Object.values(interfaces).forEach(iface => { iface.forEach(addr => { if (addr.family === 'IPv4' && !addr.internal) { localIPs.add(addr.address); } }); }); return localIPs; } start() { if (this.isRunning) { return; } this.socket = dgram.createSocket('udp4'); this.isRunning = true; this.socket.on('message', (msg, rinfo) => { this.handleMessage(msg, rinfo); }); this.socket.on('error', (err) => { console.error('UDP Discovery socket error:', err); this.emit('error', err); }); this.socket.bind(this.port, () => { console.log(`UDP Discovery listening on port ${this.port}`); // Enable broadcast after binding this.socket.setBroadcast(true); this.emit('started'); }); // Start periodic discovery broadcast this.startDiscoveryBroadcast(); } stop() { if (!this.isRunning) { return; } this.isRunning = false; if (this.discoveryInterval) { clearInterval(this.discoveryInterval); this.discoveryInterval = null; } if (this.socket) { this.socket.close(); this.socket = null; } this.nodes.clear(); console.log('UDP Discovery stopped'); this.emit('stopped'); } handleMessage(msg, rinfo) { const message = msg.toString('utf8'); const nodeIp = rinfo.address; // Skip local server IPs if (this.localInterfaces.has(nodeIp)) { return; } // Update node last seen time this.nodes.set(nodeIp, { lastSeen: Date.now(), status: 'connected', address: nodeIp, port: rinfo.port }); // Emit node discovered/updated event this.emit('nodeDiscovered', { ip: nodeIp, port: rinfo.port, status: 'connected' }); // Clean up stale nodes periodically this.cleanupStaleNodes(); } startDiscoveryBroadcast() { // Broadcast discovery message every 5 seconds this.discoveryInterval = setInterval(() => { this.broadcastDiscovery(); }, 5000); // Send initial broadcast this.broadcastDiscovery(); } broadcastDiscovery() { if (!this.socket) { return; } const discoveryMessage = 'SPORE_DISCOVERY'; const message = Buffer.from(discoveryMessage, 'utf8'); // Broadcast to all nodes on the network (broadcast already enabled in bind callback) this.socket.send(message, 0, message.length, this.port, '255.255.255.255', (err) => { if (err) { console.error('Error broadcasting discovery message:', err); } else { console.log('Discovery message broadcasted'); } }); } cleanupStaleNodes() { const now = Date.now(); const staleThreshold = 10000; // 10 seconds for (const [ip, node] of this.nodes.entries()) { if (now - node.lastSeen > staleThreshold) { this.nodes.delete(ip); this.emit('nodeLost', { ip, status: 'disconnected' }); } } } getNodes() { const nodes = Array.from(this.nodes.entries()).map(([ip, node]) => ({ ip, ...node })); // Add broadcast option nodes.unshift({ ip: 'broadcast', status: 'broadcast', address: '255.255.255.255', port: this.port, isBroadcast: true }); return nodes; } getNodeCount() { return this.nodes.size; } sendToNode(nodeIp, message) { if (!this.socket) { return false; } const buffer = Buffer.from(message, 'utf8'); this.socket.send(buffer, 0, buffer.length, this.port, nodeIp, (err) => { if (err) { console.error(`Error sending to node ${nodeIp}:`, err); return false; } return true; }); return true; } broadcastToAll(message) { if (!this.socket) { return false; } const buffer = Buffer.from(message, 'utf8'); this.socket.setBroadcast(true); this.socket.send(buffer, 0, buffer.length, this.port, '255.255.255.255', (err) => { if (err) { console.error('Error broadcasting message:', err); return false; } return true; }); return true; } } module.exports = UdpDiscovery;