diff --git a/README.md b/README.md index 779e071..d4d83b7 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ The backend now includes automatic UDP discovery for SPORE nodes on the network. ### ๐Ÿš€ How It Works 1. **UDP Server**: The backend listens on port 4210 for UDP messages -2. **Discovery Message**: Nodes send `CLUSTER_DISCOVERY` messages to broadcast address `255.255.255.255:4210` +2. **Heartbeat Message**: Nodes send `CLUSTER_HEARTBEAT` messages to broadcast address `255.255.255.255:4210` 3. **Auto Configuration**: When a discovery message is received, the source IP is automatically used to configure the SporeApiClient 4. **Dynamic Updates**: The system automatically switches to the most recently seen node as the primary connection 5. **Health Monitoring**: Continuous monitoring of node availability with automatic failover @@ -57,7 +57,7 @@ The backend now includes automatic UDP discovery for SPORE nodes on the network. ### ๐Ÿ“ก Discovery Protocol - **Port**: 4210 (configurable via `UDP_PORT` constant) -- **Message**: `CLUSTER_DISCOVERY` (configurable via `DISCOVERY_MESSAGE` constant) +- **Message**: `CLUSTER_HEARTBEAT` (configurable via `HEARTBEAT_MESSAGE` constant) - **Broadcast**: `255.255.255.255:4210` - **Protocol**: UDP broadcast listening - **Auto-binding**: Automatically binds to the specified port on startup @@ -72,27 +72,27 @@ npm start # The server will automatically: # - Start HTTP server on port 3001 # - Start UDP discovery server on port 4210 -# - Wait for CLUSTER_DISCOVERY messages +# - Wait for CLUSTER_HEARTBEAT messages ``` #### Node Configuration -SPORE nodes should send discovery messages periodically: +SPORE nodes should send heartbeat messages periodically: ```bash # Recommended: Send every 30-60 seconds -# Message format: "CLUSTER_DISCOVERY" +# Message format: "CLUSTER_HEARTBEAT:hostname" # Target: 255.255.255.255:4210 ``` -### ๐ŸŒ Discovery Endpoints +### ๐ŸŒ Cluster Endpoints -#### Discovery Management -- `GET /api/discovery/nodes` - View all discovered nodes and current status -- `POST /api/discovery/refresh` - Manually trigger discovery refresh +#### Cluster Management +- `GET /api/discovery/nodes` - View all cluster nodes and current status +- `POST /api/discovery/refresh` - Manually trigger cluster refresh - `POST /api/discovery/primary/:ip` - Manually set a specific node as primary - `POST /api/discovery/random-primary` - Randomly select a new primary node #### Health Monitoring -- `GET /api/health` - Comprehensive health check including discovery status +- `GET /api/health` - Comprehensive health check including cluster status ### ๐Ÿงช Testing & Development diff --git a/index.js b/index.js index 34ba222..fab08b7 100644 --- a/index.js +++ b/index.js @@ -41,12 +41,11 @@ app.use(cors({ allowedHeaders: ['Content-Type', 'Authorization'] })); -// UDP discovery configuration +// UDP heartbeat-only configuration const UDP_PORT = 4210; -const DISCOVERY_MESSAGE = 'CLUSTER_DISCOVERY'; -const STALE_THRESHOLD_SECONDS = 3; // 3 seconds for faster detection +const STALE_THRESHOLD_SECONDS = 8; // 8 seconds to accommodate 5-second heartbeat interval -// Initialize UDP server for auto discovery +// Initialize UDP server for heartbeat-based cluster management const udpServer = dgram.createSocket('udp4'); // Store discovered nodes and their IPs @@ -68,107 +67,16 @@ udpServer.on('message', (msg, rinfo) => { const message = msg.toString().trim(); const sourceIp = rinfo.address; const sourcePort = rinfo.port; - - // Only log non-discovery messages to reduce noise - if (message !== DISCOVERY_MESSAGE) { - console.log(`๐Ÿ“จ UDP message received from ${sourceIp}:${sourcePort}: "${message}"`); - } - - if (message === DISCOVERY_MESSAGE) { - //console.log(`Received CLUSTER_DISCOVERY from ${sourceIp}:${sourcePort}`); - // Store the discovered node - const now = Math.floor(Date.now() / 1000); - const nodeInfo = { - ip: sourceIp, - port: sourcePort, - status: 'active', // New nodes from discovery are active - discoveredAt: now, - lastSeen: now - }; + console.log(`๐Ÿ“จ UDP message received from ${sourceIp}:${sourcePort}: "${message}"`); - const isNewNode = !discoveredNodes.has(sourceIp); - discoveredNodes.set(sourceIp, nodeInfo); - - // Set as primary node if this is the first one or if we don't have one - if (!primaryNodeIp) { - primaryNodeIp = sourceIp; - console.log(`Set primary node to ${sourceIp}`); - - // Immediately try to initialize the client - updateSporeClient(); - } - - // Update last seen timestamp - discoveredNodes.get(sourceIp).lastSeen = Math.floor(Date.now() / 1000); - - // Broadcast discovery event if this is a new node - if (isNewNode) { - console.log(`๐Ÿ†• NEW NODE DISCOVERED: ${sourceIp}:${sourcePort} via CLUSTER_DISCOVERY. Total nodes: ${discoveredNodes.size}`); - broadcastNodeDiscovery(sourceIp, 'discovered'); - // Broadcast cluster update after a short delay to allow member data to be fetched - setTimeout(() => broadcastMemberListChange('new discovery'), 1000); - } - } else if (message.startsWith('CLUSTER_HEARTBEAT:')) { - // Handle heartbeat messages that also update member list + if (message.startsWith('CLUSTER_HEARTBEAT:')) { + // Handle heartbeat messages that update member list const hostname = message.substring('CLUSTER_HEARTBEAT:'.length); - // Update or create node entry from heartbeat - const existingNode = discoveredNodes.get(sourceIp); - const now = Math.floor(Date.now() / 1000); // Use Unix timestamp for consistency - - if (existingNode) { - // Update existing node - const wasStale = existingNode.status === 'inactive'; - const oldHostname = existingNode.hostname; - - existingNode.lastSeen = now; - existingNode.hostname = hostname; - existingNode.status = 'active'; // Mark as active when heartbeat received - - logger.debug(`๐Ÿ’“ Heartbeat from ${sourceIp}:${sourcePort} (${hostname}). Total nodes: ${discoveredNodes.size}`); - - // Check if hostname changed - const hostnameChanged = oldHostname !== hostname; - if (hostnameChanged) { - console.log(`๐Ÿ”„ Hostname updated for ${sourceIp}: "${oldHostname}" -> "${hostname}"`); - } - - // ALWAYS broadcast every heartbeat for immediate UI updates - // This ensures the UI gets real-time updates without delay - const reason = wasStale ? 'node became active' : - hostnameChanged ? 'hostname update' : - 'active heartbeat'; - - logger.debug(`๐Ÿ“ก Broadcasting heartbeat update: ${reason}`); - broadcastMemberListChange(reason); - } else { - // Create new node entry from heartbeat - NEW NODE DISCOVERED - const nodeInfo = { - ip: sourceIp, - port: sourcePort, - hostname: hostname, - status: 'active', // New nodes from heartbeat are active - discoveredAt: now, - lastSeen: now - }; - discoveredNodes.set(sourceIp, nodeInfo); - - console.log(`๐Ÿ†• NEW NODE DISCOVERED: ${sourceIp}:${sourcePort} (${hostname}) via heartbeat. Total nodes: ${discoveredNodes.size}`); - - // Set as primary node if this is the first one or if we don't have one - if (!primaryNodeIp) { - primaryNodeIp = sourceIp; - console.log(`Set primary node to ${sourceIp} (from heartbeat)`); - - // Immediately try to initialize the client - updateSporeClient(); - } - - // Broadcast discovery event for new node from heartbeat - broadcastNodeDiscovery(sourceIp, 'discovered'); - // Broadcast cluster update after a short delay to allow member data to be fetched - setTimeout(() => broadcastMemberListChange('new heartbeat discovery'), 1000); - } + updateNodeFromHeartbeat(sourceIp, sourcePort, hostname); + } else if (message.startsWith('NODE_UPDATE:')) { + // Handle node update messages that provide detailed node info + handleNodeUpdate(sourceIp, message); } else if (!message.startsWith('RAW:')) { console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`); } @@ -179,12 +87,12 @@ udpServer.on('message', (msg, rinfo) => { udpServer.on('listening', () => { const address = udpServer.address(); - console.log(`UDP discovery server listening on ${address.address}:${address.port}`); + console.log(`UDP heartbeat server listening on ${address.address}:${address.port}`); }); -// Bind UDP server to listen for discovery messages +// Bind UDP server to listen for heartbeat messages udpServer.bind(UDP_PORT, () => { - console.log(`UDP discovery server bound to port ${UDP_PORT}`); + console.log(`UDP heartbeat server bound to port ${UDP_PORT}`); }); // Initialize the SPORE API client with dynamic IP @@ -215,7 +123,7 @@ function markStaleNodes() { for (const [ip, node] of discoveredNodes.entries()) { const timeSinceLastSeen = now - node.lastSeen; - + if (timeSinceLastSeen > STALE_THRESHOLD_SECONDS && node.status !== 'inactive') { console.log(`๐Ÿ’€ NODE MARKED INACTIVE: ${ip} (${node.hostname || 'Unknown'}) - last seen ${timeSinceLastSeen}s ago (threshold: ${STALE_THRESHOLD_SECONDS}s)`); node.status = 'inactive'; @@ -358,6 +266,109 @@ async function performWithFailover(operation) { throw lastError || new Error('All discovered nodes failed'); } +// Function to update node from heartbeat message +function updateNodeFromHeartbeat(sourceIp, sourcePort, hostname) { + const existingNode = discoveredNodes.get(sourceIp); + const now = Math.floor(Date.now() / 1000); + + if (existingNode) { + // Update existing node + const wasStale = existingNode.status === 'inactive'; + const oldHostname = existingNode.hostname; + + existingNode.lastSeen = now; + existingNode.hostname = hostname; + existingNode.status = 'active'; // Mark as active when heartbeat received + + logger.debug(`๐Ÿ’“ Heartbeat from ${sourceIp}:${sourcePort} (${hostname}). Total nodes: ${discoveredNodes.size}`); + + // Check if hostname changed + const hostnameChanged = oldHostname !== hostname; + if (hostnameChanged) { + console.log(`๐Ÿ”„ Hostname updated for ${sourceIp}: "${oldHostname}" -> "${hostname}"`); + } + + // Broadcast heartbeat update for immediate UI updates + const reason = wasStale ? 'node became active' : + hostnameChanged ? 'hostname update' : + 'active heartbeat'; + + logger.debug(`๐Ÿ“ก Broadcasting heartbeat update: ${reason}`); + broadcastMemberListChange(reason); + } else { + // Create new node entry from heartbeat - NEW NODE DISCOVERED + const nodeInfo = { + ip: sourceIp, + port: sourcePort, + hostname: hostname, + status: 'active', + discoveredAt: now, + lastSeen: now + }; + discoveredNodes.set(sourceIp, nodeInfo); + + console.log(`๐Ÿ†• NEW NODE DISCOVERED: ${sourceIp}:${sourcePort} (${hostname}) via heartbeat. Total nodes: ${discoveredNodes.size}`); + + // Set as primary node if this is the first one or if we don't have one + if (!primaryNodeIp) { + primaryNodeIp = sourceIp; + console.log(`Set primary node to ${sourceIp} (from heartbeat)`); + updateSporeClient(); + } + + // Broadcast discovery event for new node from heartbeat + broadcastNodeDiscovery(sourceIp, 'discovered'); + // Broadcast cluster update after a short delay to allow member data to be fetched + setTimeout(() => broadcastMemberListChange('new heartbeat discovery'), 1000); + } +} + +// Function to handle NODE_UPDATE messages +function handleNodeUpdate(sourceIp, message) { + // Message format: "NODE_UPDATE:hostname:{json}" + const parts = message.split(':'); + if (parts.length < 3) { + console.warn(`Invalid NODE_UPDATE message format: ${message}`); + return; + } + + const hostname = parts[1]; + const jsonData = parts.slice(2).join(':'); + + try { + const nodeData = JSON.parse(jsonData); + + // Update the specific node with the new information + const existingNode = discoveredNodes.get(sourceIp); + if (existingNode) { + // Update hostname if provided + if (nodeData.hostname) { + existingNode.hostname = nodeData.hostname; + } + + // Update uptime if provided + if (nodeData.uptime) { + existingNode.uptime = nodeData.uptime; + } + + // Update labels if provided + if (nodeData.labels) { + existingNode.labels = nodeData.labels; + } + + existingNode.lastSeen = Math.floor(Date.now() / 1000); + existingNode.status = 'active'; + + console.log(`๐Ÿ”„ Updated node ${sourceIp} (${hostname}) from NODE_UPDATE`); + broadcastMemberListChange('node update'); + } else { + console.warn(`Received NODE_UPDATE for unknown node: ${sourceIp} (${hostname})`); + } + } catch (error) { + console.error(`Error parsing NODE_UPDATE JSON: ${error.message}`); + } +} + // Set up periodic tasks setInterval(() => { markStaleNodes(); @@ -374,7 +385,7 @@ app.get('/', (req, res) => { res.sendFile(path.join(__dirname, 'public', 'index.html')); }); -// API endpoint to get discovered nodes +// API endpoint to get cluster nodes (heartbeat-based) app.get('/api/discovery/nodes', (req, res) => { const nodes = Array.from(discoveredNodes.values()).map(node => ({ ...node, @@ -382,22 +393,21 @@ app.get('/api/discovery/nodes', (req, res) => { lastSeen: new Date(node.lastSeen * 1000).toISOString(), isPrimary: node.ip === primaryNodeIp })); - + res.json({ primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, nodes: nodes, clientInitialized: !!sporeClient, clientBaseUrl: sporeClient ? sporeClient.baseUrl : null, - discoveryStatus: { + clusterStatus: { udpPort: UDP_PORT, - discoveryMessage: DISCOVERY_MESSAGE, serverRunning: udpServer.listening } }); }); -// API endpoint to manually trigger discovery refresh +// API endpoint to manually trigger cluster refresh app.post('/api/discovery/refresh', (req, res) => { try { // Mark stale nodes as inactive @@ -411,15 +421,15 @@ app.post('/api/discovery/refresh', (req, res) => { res.json({ success: true, - message: 'Discovery refresh completed', + message: 'Cluster refresh completed', primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, clientInitialized: !!sporeClient }); } catch (error) { - console.error('Error during discovery refresh:', error); + console.error('Error during cluster refresh:', error); res.status(500).json({ - error: 'Discovery refresh failed', + error: 'Cluster refresh failed', message: error.message }); } @@ -526,7 +536,7 @@ app.get('/api/cluster/members', async (req, res) => { if (discoveredNodes.size === 0) { return res.status(503).json({ error: 'Service unavailable', - message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', + message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } @@ -564,7 +574,7 @@ app.get('/api/tasks/status', async (req, res) => { if (discoveredNodes.size === 0) { return res.status(503).json({ error: 'Service unavailable', - message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', + message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } @@ -624,7 +634,7 @@ app.get('/api/node/endpoints', async (req, res) => { if (discoveredNodes.size === 0) { return res.status(503).json({ error: 'Service unavailable', - message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', + message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } @@ -867,28 +877,28 @@ app.get('/api/health', (req, res) => { udp: udpServer.listening, sporeClient: !!sporeClient }, - discovery: { + cluster: { totalNodes: discoveredNodes.size, primaryNode: primaryNodeIp, udpPort: UDP_PORT, serverRunning: udpServer.listening } }; - + // If no nodes discovered, mark as degraded if (discoveredNodes.size === 0) { health.status = 'degraded'; health.message = 'No SPORE nodes discovered yet'; } - + // If no client initialized, mark as degraded if (!sporeClient) { health.status = 'degraded'; - health.message = health.message ? - `${health.message}; SPORE client not initialized` : + health.message = health.message ? + `${health.message}; SPORE client not initialized` : 'SPORE client not initialized'; } - + const statusCode = health.status === 'healthy' ? 200 : 503; res.status(statusCode).json(health); }); @@ -1106,20 +1116,20 @@ function initializeWebSocketServer(httpServer) { const server = app.listen(PORT, '0.0.0.0', () => { console.log(`Server is running on http://0.0.0.0:${PORT}`); console.log(`Accessible from: http://YOUR_COMPUTER_IP:${PORT}`); - console.log(`UDP discovery server listening on port ${UDP_PORT}`); + console.log(`UDP heartbeat server listening on port ${UDP_PORT}`); // Initialize WebSocket server after HTTP server is running initializeWebSocketServer(server); console.log('WebSocket server ready for real-time updates'); - console.log('Waiting for CLUSTER_DISCOVERY messages from SPORE nodes...'); + console.log('Waiting for CLUSTER_HEARTBEAT and NODE_UPDATE messages from SPORE nodes...'); }); // Graceful shutdown handling process.on('SIGINT', () => { console.log('\nReceived SIGINT. Shutting down gracefully...'); udpServer.close(() => { - console.log('UDP server closed.'); + console.log('UDP heartbeat server closed.'); }); server.close(() => { console.log('HTTP server closed.'); @@ -1130,7 +1140,7 @@ process.on('SIGINT', () => { process.on('SIGTERM', () => { console.log('\nReceived SIGTERM. Shutting down gracefully...'); udpServer.close(() => { - console.log('UDP server closed.'); + console.log('UDP heartbeat server closed.'); }); server.close(() => { console.log('HTTP server closed.'); diff --git a/src/client/example.js b/src/client/example.js index b887aa0..ea00a4a 100644 --- a/src/client/example.js +++ b/src/client/example.js @@ -23,8 +23,8 @@ async function main() { await runExamples(client); } else { console.log('โŒ No nodes discovered yet.'); - console.log('๐Ÿ’ก Start the backend server and send CLUSTER_DISCOVERY messages'); - console.log('๐Ÿ’ก Use: npm run test-discovery broadcast'); + console.log('๐Ÿ’ก Start the backend server and send CLUSTER_HEARTBEAT messages'); + console.log('๐Ÿ’ก Use: npm run test-heartbeat broadcast'); return; } } catch (error) {