const express = require('express'); const path = require('path'); const fs = require('fs'); const dgram = require('dgram'); const SporeApiClient = require('./src/client'); const cors = require('cors'); const WebSocket = require('ws'); // Simple logging utility with level control const logger = { debug: (...args) => { if (process.env.LOG_LEVEL === 'debug' || process.env.NODE_ENV === 'development') { console.log('[DEBUG]', ...args); } }, info: (...args) => console.log('[INFO]', ...args), warn: (...args) => console.warn('[WARN]', ...args), error: (...args) => console.error('[ERROR]', ...args) }; const app = express(); const PORT = process.env.PORT || 3001; // Middleware app.use(express.json()); app.use(express.urlencoded({ extended: true })); // File upload middleware const fileUpload = require('express-fileupload'); app.use(fileUpload({ limits: { fileSize: 50 * 1024 * 1024 }, // 50MB limit abortOnLimit: true, responseOnLimit: 'File size limit has been reached', debug: false })); // Add CORS middleware app.use(cors({ origin: '*', // Or specify your phone's IP range like: ['http://192.168.1.0/24'] methods: ['GET', 'POST', 'PUT', 'DELETE'], allowedHeaders: ['Content-Type', 'Authorization'] })); // UDP heartbeat-only configuration const UDP_PORT = 4210; const STALE_THRESHOLD_SECONDS = 8; // 8 seconds to accommodate 5-second heartbeat interval // Initialize UDP server for heartbeat-based cluster management const udpServer = dgram.createSocket('udp4'); // Store discovered nodes and their IPs const discoveredNodes = new Map(); let primaryNodeIp = null; // UDP server event handlers udpServer.on('error', (err) => { if (err.code === 'EADDRINUSE') { console.error(`UDP port ${UDP_PORT} is already in use. Please check if another instance is running.`); } else { console.error('UDP Server error:', err); } udpServer.close(); }); udpServer.on('message', (msg, rinfo) => { try { const message = msg.toString().trim(); const sourceIp = rinfo.address; const sourcePort = rinfo.port; console.log(`๐Ÿ“จ UDP message received from ${sourceIp}:${sourcePort}: "${message}"`); // Extract topic by splitting on first ":" const parts = message.split(':', 2); if (parts.length < 2) { console.log(`Invalid message format from ${sourceIp}:${sourcePort}: "${message}"`); return; } const topic = parts[0] + ':'; const payload = parts[1]; // Handler map for different UDP message types const handlers = { 'cluster/heartbeat:': (payload, sourceIp, sourcePort) => { updateNodeFromHeartbeat(sourceIp, sourcePort, payload); }, 'node/update:': (payload, sourceIp) => { handleNodeUpdate(sourceIp, 'node/update:' + payload); }, 'raw:': () => { // Ignore raw messages } }; // Look up and execute handler const handler = handlers[topic]; if (handler) { handler(payload, sourceIp, sourcePort); } else { console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`); } } catch (error) { console.error('Error processing UDP message:', error); } }); udpServer.on('listening', () => { const address = udpServer.address(); console.log(`UDP heartbeat server listening on ${address.address}:${address.port}`); }); // Bind UDP server to listen for heartbeat messages udpServer.bind(UDP_PORT, () => { console.log(`UDP heartbeat server bound to port ${UDP_PORT}`); }); // Initialize the SPORE API client with dynamic IP let sporeClient = null; // Function to initialize or update the SporeApiClient function initializeSporeClient(nodeIp) { if (!nodeIp) { console.warn('No node IP available for SporeApiClient initialization'); return null; } try { const client = new SporeApiClient(`http://${nodeIp}`); console.log(`Initialized SporeApiClient with node IP: ${nodeIp}`); return client; } catch (error) { console.error(`Failed to initialize SporeApiClient with IP ${nodeIp}:`, error); return null; } } // Function to mark stale nodes as inactive (instead of removing them) function markStaleNodes() { const now = Math.floor(Date.now() / 1000); let nodesMarkedStale = false; for (const [ip, node] of discoveredNodes.entries()) { const timeSinceLastSeen = now - node.lastSeen; if (timeSinceLastSeen > STALE_THRESHOLD_SECONDS && node.status !== 'inactive') { console.log(`๐Ÿ’€ NODE MARKED INACTIVE: ${ip} (${node.hostname || 'Unknown'}) - last seen ${timeSinceLastSeen}s ago (threshold: ${STALE_THRESHOLD_SECONDS}s)`); node.status = 'inactive'; nodesMarkedStale = true; // Broadcast stale node event immediately logger.debug(`๐Ÿ“ก Broadcasting stale node event for ${ip}`); broadcastNodeDiscovery(ip, 'stale'); // If this was our primary node, clear it and select a new one if (primaryNodeIp === ip) { primaryNodeIp = null; console.log('๐Ÿšซ PRIMARY NODE BECAME STALE: Clearing primary node selection'); // Automatically select a new primary node from remaining healthy nodes const newPrimary = selectBestPrimaryNode(); if (newPrimary) { console.log(`โœ… NEW PRIMARY NODE SELECTED: ${newPrimary} (auto-selected after stale cleanup)`); // Update the SPORE client to use the new primary node updateSporeClient(); } else { console.log('โš ๏ธ No healthy nodes available for primary selection'); } } } } // Broadcast cluster update if any nodes were marked stale if (nodesMarkedStale) { broadcastMemberListChange('nodes marked stale'); } } // Function to select the best primary node function selectBestPrimaryNode() { if (discoveredNodes.size === 0) { return null; } // If we already have a valid primary node, keep it if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) { return primaryNodeIp; } // Select the most recently seen node as primary let bestNode = null; let mostRecent = new Date(0); for (const [ip, node] of discoveredNodes.entries()) { if (node.lastSeen > mostRecent) { mostRecent = node.lastSeen; bestNode = ip; } } if (bestNode && bestNode !== primaryNodeIp) { primaryNodeIp = bestNode; console.log(`Selected new primary node: ${bestNode}`); broadcastMemberListChange('primary node change'); } return bestNode; } // Function to randomly select a primary node function selectRandomPrimaryNode() { if (discoveredNodes.size === 0) { return null; } // Convert discovered nodes to array and filter out current primary const availableNodes = Array.from(discoveredNodes.keys()).filter(ip => ip !== primaryNodeIp); if (availableNodes.length === 0) { // If no other nodes available, keep current primary return primaryNodeIp; } // Randomly select from available nodes const randomIndex = Math.floor(Math.random() * availableNodes.length); const randomNode = availableNodes[randomIndex]; // Update primary node primaryNodeIp = randomNode; console.log(`Randomly selected new primary node: ${randomNode}`); broadcastMemberListChange('random primary node selection'); return randomNode; } // Initialize client when a node is discovered function updateSporeClient() { const nodeIp = selectBestPrimaryNode(); if (nodeIp) { sporeClient = initializeSporeClient(nodeIp); } } // Helper: perform an operation against the current primary, failing over to other discovered nodes if needed async function performWithFailover(operation) { // Build candidate list: current primary first, then others by most recently seen const candidateIps = []; if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) { candidateIps.push(primaryNodeIp); } const others = Array.from(discoveredNodes.values()) .filter(n => n.ip !== primaryNodeIp) .sort((a, b) => b.lastSeen - a.lastSeen) .map(n => n.ip); candidateIps.push(...others); if (candidateIps.length === 0) { throw new Error('No SPORE nodes discovered'); } let lastError = null; for (const ip of candidateIps) { try { const client = (sporeClient && ip === primaryNodeIp) ? sporeClient : initializeSporeClient(ip); if (!client) { throw new Error(`Failed to initialize client for ${ip}`); } const result = await operation(client, ip); if (ip !== primaryNodeIp) { primaryNodeIp = ip; sporeClient = client; logger.info(`Failover: switched primary node to ${ip}`); broadcastMemberListChange('failover primary node switch'); } return result; } catch (err) { console.warn(`Primary attempt on ${ip} failed: ${err.message}`); lastError = err; continue; } } throw lastError || new Error('All discovered nodes failed'); } // Function to update node from heartbeat message function updateNodeFromHeartbeat(sourceIp, sourcePort, hostname) { const existingNode = discoveredNodes.get(sourceIp); const now = Math.floor(Date.now() / 1000); if (existingNode) { // Update existing node const wasStale = existingNode.status === 'inactive'; const oldHostname = existingNode.hostname; existingNode.lastSeen = now; existingNode.hostname = hostname; existingNode.status = 'active'; // Mark as active when heartbeat received logger.debug(`๐Ÿ’“ Heartbeat from ${sourceIp}:${sourcePort} (${hostname}). Total nodes: ${discoveredNodes.size}`); // Check if hostname changed const hostnameChanged = oldHostname !== hostname; if (hostnameChanged) { console.log(`๐Ÿ”„ Hostname updated for ${sourceIp}: "${oldHostname}" -> "${hostname}"`); } // Broadcast heartbeat update for immediate UI updates const reason = wasStale ? 'node became active' : hostnameChanged ? 'hostname update' : 'active heartbeat'; logger.debug(`๐Ÿ“ก Broadcasting heartbeat update: ${reason}`); broadcastMemberListChange(reason); } else { // Create new node entry from heartbeat - NEW NODE DISCOVERED const nodeInfo = { ip: sourceIp, port: sourcePort, hostname: hostname, status: 'active', discoveredAt: now, lastSeen: now }; discoveredNodes.set(sourceIp, nodeInfo); console.log(`๐Ÿ†• NEW NODE DISCOVERED: ${sourceIp}:${sourcePort} (${hostname}) via heartbeat. Total nodes: ${discoveredNodes.size}`); // Set as primary node if this is the first one or if we don't have one if (!primaryNodeIp) { primaryNodeIp = sourceIp; console.log(`Set primary node to ${sourceIp} (from heartbeat)`); updateSporeClient(); } // Broadcast discovery event for new node from heartbeat broadcastNodeDiscovery(sourceIp, 'discovered'); // Broadcast cluster update after a short delay to allow member data to be fetched setTimeout(() => broadcastMemberListChange('new heartbeat discovery'), 1000); } } // Function to handle NODE_UPDATE messages function handleNodeUpdate(sourceIp, message) { // Message format: "NODE_UPDATE:hostname:{json}" const parts = message.split(':'); if (parts.length < 3) { console.warn(`Invalid NODE_UPDATE message format: ${message}`); return; } const hostname = parts[1]; const jsonData = parts.slice(2).join(':'); try { const nodeData = JSON.parse(jsonData); // Update the specific node with the new information const existingNode = discoveredNodes.get(sourceIp); if (existingNode) { // Update hostname if provided if (nodeData.hostname) { existingNode.hostname = nodeData.hostname; } // Update uptime if provided if (nodeData.uptime) { existingNode.uptime = nodeData.uptime; } // Update labels if provided if (nodeData.labels) { existingNode.labels = nodeData.labels; } existingNode.lastSeen = Math.floor(Date.now() / 1000); existingNode.status = 'active'; console.log(`๐Ÿ”„ Updated node ${sourceIp} (${hostname}) from NODE_UPDATE`); broadcastMemberListChange('node update'); } else { console.warn(`Received NODE_UPDATE for unknown node: ${sourceIp} (${hostname})`); } } catch (error) { console.error(`Error parsing NODE_UPDATE JSON: ${error.message}`); } } // Set up periodic tasks setInterval(() => { markStaleNodes(); if (!sporeClient || !primaryNodeIp || !discoveredNodes.has(primaryNodeIp)) { updateSporeClient(); } }, 2000); // Check every 2 seconds for faster stale detection // Serve static files from public directory app.use(express.static(path.join(__dirname, 'public'))); // Serve the main HTML page app.get('/', (req, res) => { res.sendFile(path.join(__dirname, 'public', 'index.html')); }); // API endpoint to get cluster nodes (heartbeat-based) app.get('/api/discovery/nodes', (req, res) => { const nodes = Array.from(discoveredNodes.values()).map(node => ({ ...node, discoveredAt: new Date(node.discoveredAt * 1000).toISOString(), lastSeen: new Date(node.lastSeen * 1000).toISOString(), isPrimary: node.ip === primaryNodeIp })); res.json({ primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, nodes: nodes, clientInitialized: !!sporeClient, clientBaseUrl: sporeClient ? sporeClient.baseUrl : null, clusterStatus: { udpPort: UDP_PORT, serverRunning: udpServer.listening } }); }); // API endpoint to manually trigger cluster refresh app.post('/api/discovery/refresh', (req, res) => { try { // Mark stale nodes as inactive markStaleNodes(); // Try to update the client updateSporeClient(); // Broadcast cluster update via WebSocket broadcastMemberListChange('manual refresh'); res.json({ success: true, message: 'Cluster refresh completed', primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, clientInitialized: !!sporeClient }); } catch (error) { console.error('Error during cluster refresh:', error); res.status(500).json({ error: 'Cluster refresh failed', message: error.message }); } }); // API endpoint to test WebSocket broadcasting app.post('/api/test/websocket', (req, res) => { try { console.log('๐Ÿงช Manual WebSocket test triggered'); broadcastMemberListChange('manual test'); res.json({ success: true, message: 'WebSocket test broadcast sent', websocketClients: wsClients.size, totalNodes: discoveredNodes.size }); } catch (error) { console.error('Error during WebSocket test:', error); res.status(500).json({ error: 'WebSocket test failed', message: error.message }); } }); // API endpoint to randomly select a new primary node app.post('/api/discovery/random-primary', (req, res) => { try { if (discoveredNodes.size === 0) { return res.status(404).json({ error: 'No nodes available', message: 'No SPORE nodes have been discovered yet' }); } // Randomly select a new primary node const randomNode = selectRandomPrimaryNode(); if (!randomNode) { return res.status(500).json({ error: 'Selection failed', message: 'Failed to select a random primary node' }); } // Update the client with the new primary node updateSporeClient(); // Get current timestamp for the response const timestamp = req.body && req.body.timestamp ? req.body.timestamp : new Date().toISOString(); res.json({ success: true, message: `Randomly selected new primary node: ${randomNode}`, primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, clientInitialized: !!sporeClient, timestamp: timestamp }); } catch (error) { console.error('Error selecting random primary node:', error); res.status(500).json({ error: 'Random selection failed', message: error.message }); } }); // API endpoint to manually set primary node app.post('/api/discovery/primary/:ip', (req, res) => { try { const requestedIp = req.params.ip; if (!discoveredNodes.has(requestedIp)) { return res.status(404).json({ error: 'Node not found', message: `Node with IP ${requestedIp} has not been discovered` }); } primaryNodeIp = requestedIp; updateSporeClient(); broadcastMemberListChange('manual primary node setting'); res.json({ success: true, message: `Primary node set to ${requestedIp}`, primaryNode: primaryNodeIp, clientInitialized: !!sporeClient }); } catch (error) { console.error('Error setting primary node:', error); res.status(500).json({ error: 'Failed to set primary node', message: error.message }); } }); // API endpoint to get cluster members app.get('/api/cluster/members', async (req, res) => { try { if (discoveredNodes.size === 0) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const members = await performWithFailover((client) => client.getClusterStatus()); res.json(members); } catch (error) { console.error('Error fetching cluster members:', error); res.status(502).json({ error: 'Failed to fetch cluster members', message: error.message }); } }); // API endpoint to get task status app.get('/api/tasks/status', async (req, res) => { try { const { ip } = req.query; if (ip) { try { const nodeClient = new SporeApiClient(`http://${ip}`); const taskStatus = await nodeClient.getTaskStatus(); return res.json(taskStatus); } catch (innerError) { console.error('Error fetching task status from specific node:', innerError); return res.status(500).json({ error: 'Failed to fetch task status from node', message: innerError.message }); } } if (discoveredNodes.size === 0) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const taskStatus = await performWithFailover((client) => client.getTaskStatus()); res.json(taskStatus); } catch (error) { console.error('Error fetching task status:', error); res.status(502).json({ error: 'Failed to fetch task status', message: error.message }); } }); // API endpoint to get system status app.get('/api/node/status', async (req, res) => { try { if (discoveredNodes.size === 0) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const systemStatus = await performWithFailover((client) => client.getSystemStatus()); res.json(systemStatus); } catch (error) { console.error('Error fetching system status:', error); res.status(502).json({ error: 'Failed to fetch system status', message: error.message }); } }); // Proxy endpoint to get node capabilities (optionally for a specific node via ?ip=) app.get('/api/node/endpoints', async (req, res) => { try { const { ip } = req.query; if (ip) { try { const nodeClient = new SporeApiClient(`http://${ip}`); const caps = await nodeClient.getCapabilities(); return res.json(caps); } catch (innerError) { console.error('Error fetching endpoints from specific node:', innerError); return res.status(500).json({ error: 'Failed to fetch endpoints from node', message: innerError.message }); } } if (discoveredNodes.size === 0) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const caps = await performWithFailover((client) => client.getCapabilities()); return res.json(caps); } catch (error) { console.error('Error fetching capabilities:', error); return res.status(502).json({ error: 'Failed to fetch capabilities', message: error.message }); } }); // Generic proxy to call a node capability directly app.post('/api/proxy-call', async (req, res) => { try { const { ip, method, uri, params } = req.body || {}; if (!ip || !method || !uri) { return res.status(400).json({ error: 'Missing required fields', message: 'Required: ip, method, uri' }); } // Build target URL let targetPath = uri; let queryParams = new URLSearchParams(); let bodyParams = new URLSearchParams(); if (Array.isArray(params)) { for (const p of params) { const name = p?.name; const value = p?.value ?? ''; const location = (p?.location || 'body').toLowerCase(); if (!name) continue; if (location === 'query') { queryParams.append(name, String(value)); } else if (location === 'path') { // Replace {name} or :name in path targetPath = targetPath.replace(new RegExp(`[{:]${name}[}]?`, 'g'), encodeURIComponent(String(value))); } else { // Default to body bodyParams.append(name, String(value)); } } } const queryString = queryParams.toString(); const fullUrl = `http://${ip}${targetPath}${queryString ? `?${queryString}` : ''}`; // Prepare fetch options const upperMethod = String(method).toUpperCase(); const fetchOptions = { method: upperMethod, headers: {} }; if (upperMethod !== 'GET') { // Default to form-encoded body for generic proxy fetchOptions.headers['Content-Type'] = 'application/x-www-form-urlencoded'; fetchOptions.body = bodyParams.toString(); } // Debug logging to trace upstream requests try { logger.debug('[proxy-call] โ†’', upperMethod, fullUrl); if (upperMethod !== 'GET') { logger.debug('[proxy-call] body:', fetchOptions.body); } } catch (_) { // ignore logging errors } // Execute request const response = await fetch(fullUrl, fetchOptions); const respContentType = response.headers.get('content-type') || ''; let data; if (respContentType.includes('application/json')) { data = await response.json(); } else { data = await response.text(); } if (!response.ok) { // Surface upstream failure details for easier debugging console.warn('[proxy-call] Upstream error', response.status, response.statusText, 'for', upperMethod, fullUrl); return res.status(response.status).json({ error: 'Upstream request failed', status: response.status, statusText: response.statusText, data }); } return res.json({ success: true, data }); } catch (error) { console.error('Error in /api/proxy-call:', error); return res.status(500).json({ error: 'Proxy call failed', message: error.message }); } }); // Proxy endpoint to get status from a specific node app.get('/api/node/status/:ip', async (req, res) => { try { const nodeIp = req.params.ip; // Create a temporary client for the specific node const nodeClient = new SporeApiClient(`http://${nodeIp}`); const nodeStatus = await nodeClient.getSystemStatus(); res.json(nodeStatus); } catch (error) { console.error(`Error fetching status from node ${req.params.ip}:`, error); res.status(500).json({ error: `Failed to fetch status from node ${req.params.ip}`, message: error.message }); } }); // Endpoint to trigger a cluster refresh app.post('/api/cluster/refresh', async (req, res) => { try { const { reason } = req.body || {}; console.log(`๐Ÿ”„ Manual cluster refresh triggered: ${reason || 'unknown reason'}`); console.log(`๐Ÿ“ก WebSocket clients connected: ${wsClients.size}`); // Trigger a cluster update broadcast broadcastMemberListChange(reason || 'manual_refresh'); res.json({ success: true, message: 'Cluster refresh triggered', reason: reason || 'manual_refresh', wsClients: wsClients.size }); } catch (error) { console.error('Error triggering cluster refresh:', error); res.status(500).json({ error: 'Failed to trigger cluster refresh', message: error.message }); } }); // File upload endpoint for firmware updates app.post('/api/node/update', async (req, res) => { try { const nodeIp = req.query.ip || req.headers['x-node-ip']; if (!nodeIp) { return res.status(400).json({ error: 'Node IP address is required', message: 'Please provide the target node IP address' }); } // Check if we have a file in the request if (!req.files || !req.files.file) { console.log('File upload request received but no file found:', { hasFiles: !!req.files, fileKeys: req.files ? Object.keys(req.files) : [], contentType: req.headers['content-type'] }); return res.status(400).json({ error: 'No file data received', message: 'Please select a firmware file to upload' }); } const uploadedFile = req.files.file; console.log(`File upload received:`, { nodeIp: nodeIp, filename: uploadedFile.name, fileSize: uploadedFile.data.length, mimetype: uploadedFile.mimetype, encoding: uploadedFile.encoding }); // Create a temporary client for the specific node const nodeClient = new SporeApiClient(`http://${nodeIp}`); console.log(`Created SPORE client for node ${nodeIp}`); // Send the firmware data to the node console.log(`Starting firmware upload to SPORE device ${nodeIp}...`); try { const updateResult = await nodeClient.updateFirmware(uploadedFile.data, uploadedFile.name); console.log(`Firmware upload to SPORE device ${nodeIp} completed:`, updateResult); // Check if the SPORE device reported a failure if (updateResult && updateResult.status === 'FAIL') { console.error(`SPORE device ${nodeIp} reported firmware update failure:`, updateResult.message); return res.status(400).json({ success: false, error: 'Firmware update failed', message: updateResult.message || 'Firmware update failed on device', nodeIp: nodeIp, fileSize: uploadedFile.data.length, filename: uploadedFile.name, result: updateResult }); } res.json({ success: true, message: 'Firmware uploaded successfully', nodeIp: nodeIp, fileSize: uploadedFile.data.length, filename: uploadedFile.name, result: updateResult }); } catch (uploadError) { console.error(`Firmware upload to SPORE device ${nodeIp} failed:`, uploadError); throw new Error(`SPORE device upload failed: ${uploadError.message}`); } } catch (error) { console.error('Error uploading firmware:', error); res.status(500).json({ error: 'Failed to upload firmware', message: error.message }); } }); // Health check endpoint app.get('/api/health', (req, res) => { const health = { status: 'healthy', timestamp: new Date().toISOString(), services: { http: true, udp: udpServer.listening, sporeClient: !!sporeClient }, cluster: { totalNodes: discoveredNodes.size, primaryNode: primaryNodeIp, udpPort: UDP_PORT, serverRunning: udpServer.listening } }; // If no nodes discovered, mark as degraded if (discoveredNodes.size === 0) { health.status = 'degraded'; health.message = 'No SPORE nodes discovered yet'; } // If no client initialized, mark as degraded if (!sporeClient) { health.status = 'degraded'; health.message = health.message ? `${health.message}; SPORE client not initialized` : 'SPORE client not initialized'; } const statusCode = health.status === 'healthy' ? 200 : 503; res.status(statusCode).json(health); }); // WebSocket server setup - will be initialized after HTTP server let wss = null; const wsClients = new Set(); // Function to broadcast cluster updates to all connected WebSocket clients function broadcastClusterUpdate() { if (wsClients.size === 0 || !wss) return; const startTime = Date.now(); logger.debug(`๐Ÿ“ก [${new Date().toISOString()}] Starting cluster update broadcast to ${wsClients.size} clients`); // Get cluster members asynchronously getCurrentClusterMembers().then(members => { const clusterData = { topic: 'cluster/update', members: members, primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, timestamp: new Date().toISOString() }; const message = JSON.stringify(clusterData); const broadcastTime = Date.now() - startTime; logger.debug(`๐Ÿ“ก [${new Date().toISOString()}] Broadcasting cluster update to ${wsClients.size} WebSocket clients (took ${broadcastTime}ms)`); logger.debug(`๐Ÿ“Š Cluster data: ${members.length} members, primary: ${primaryNodeIp || 'none'}`); wsClients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(message); } }); }).catch(error => { console.error('Error broadcasting cluster update:', error); }); } // Function to broadcast node discovery events function broadcastNodeDiscovery(nodeIp, action) { if (wsClients.size === 0 || !wss) return; const eventData = { topic: 'node/discovery', action: action, // 'discovered' or 'stale' nodeIp: nodeIp, timestamp: new Date().toISOString() }; const message = JSON.stringify(eventData); wsClients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(message); } }); } // Helper function to broadcast member list changes function broadcastMemberListChange(reason = 'update') { const timestamp = new Date().toISOString(); logger.debug(`๐Ÿ”„ [${timestamp}] Member list changed (${reason}), broadcasting update`); broadcastClusterUpdate(); } // Helper function to get current cluster members (async version) async function getCurrentClusterMembers() { try { if (discoveredNodes.size === 0) { return []; } // Fetch real cluster data from SPORE nodes for accurate information logger.debug(`๐Ÿ“ก Fetching real cluster data from ${discoveredNodes.size} nodes for WebSocket broadcast`); const clusterResponse = await performWithFailover((client) => client.getClusterStatus()); const apiMembers = clusterResponse.members || []; // Debug: Log the labels from the API response apiMembers.forEach(member => { if (member.labels && Object.keys(member.labels).length > 0) { logger.debug(`๐Ÿท๏ธ API member ${member.ip} labels:`, member.labels); } }); // Update our local discoveredNodes with fresh information from the API let updatedNodes = false; apiMembers.forEach(apiMember => { const localNode = discoveredNodes.get(apiMember.ip); if (localNode) { // Update local node with fresh API data const needsUpdate = localNode.hostname !== apiMember.hostname || localNode.status !== apiMember.status || localNode.latency !== apiMember.latency || JSON.stringify(localNode.labels) !== JSON.stringify(apiMember.labels); if (needsUpdate) { logger.debug(`๐Ÿ”„ Updating local node ${apiMember.ip} with fresh API data`); localNode.hostname = apiMember.hostname; localNode.status = apiMember.status; localNode.latency = apiMember.latency; localNode.labels = apiMember.labels || {}; localNode.lastSeen = Math.floor(Date.now() / 1000); updatedNodes = true; } } else { // New node discovered via API - shouldn't happen but handle it logger.debug(`๐Ÿ†• New node discovered via API: ${apiMember.ip}`); discoveredNodes.set(apiMember.ip, { ip: apiMember.ip, hostname: apiMember.hostname, status: apiMember.status, latency: apiMember.latency, labels: apiMember.labels || {}, discoveredAt: new Date(), lastSeen: Math.floor(Date.now() / 1000) }); updatedNodes = true; } }); // If we updated any nodes, broadcast the changes if (updatedNodes) { logger.debug(`๐Ÿ“ก Local node data updated, triggering immediate broadcast`); // Note: We don't call broadcastMemberListChange here because we're already in the middle of a broadcast // The calling function will handle the broadcast } // Enhance API data with our local status information const enhancedMembers = apiMembers.map(apiMember => { const localNode = discoveredNodes.get(apiMember.ip); if (localNode) { // Use our local status (which may be 'inactive' if the node became stale) return { ...apiMember, status: localNode.status || apiMember.status, hostname: localNode.hostname || apiMember.hostname, lastSeen: localNode.lastSeen || apiMember.lastSeen, labels: localNode.labels || apiMember.labels || {}, resources: localNode.resources || apiMember.resources || {} }; } return apiMember; }); logger.debug(`๐Ÿ“Š Returning ${enhancedMembers.length} enhanced cluster members via WebSocket`); return enhancedMembers; } catch (error) { console.error('Error getting cluster members for WebSocket:', error); // Fallback to local data if API fails logger.debug('โš ๏ธ API failed, falling back to local discoveredNodes data'); const fallbackMembers = Array.from(discoveredNodes.values()).map(node => ({ ip: node.ip, hostname: node.hostname || 'Unknown Device', status: node.status || 'active', // Use stored status (may be 'inactive') latency: node.latency || 0, lastSeen: node.lastSeen || Math.floor(Date.now() / 1000), labels: node.labels || {}, resources: node.resources || {} })); logger.debug(`๐Ÿ“Š Fallback: Returning ${fallbackMembers.length} local cluster members`); return fallbackMembers; } } // Initialize WebSocket server after HTTP server is created function initializeWebSocketServer(httpServer) { wss = new WebSocket.Server({ server: httpServer }); // WebSocket connection handler wss.on('connection', (ws) => { logger.debug('WebSocket client connected'); wsClients.add(ws); // Send current cluster state to newly connected client if (discoveredNodes.size > 0) { // Get cluster members asynchronously without blocking getCurrentClusterMembers().then(members => { const clusterData = { type: 'cluster_update', members: members, primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, timestamp: new Date().toISOString() }; logger.debug(`๐Ÿ”Œ Sending initial cluster state to new WebSocket client: ${members.length} members`); ws.send(JSON.stringify(clusterData)); }).catch(error => { console.error('Error sending initial cluster state:', error); }); } // Handle client disconnection ws.on('close', () => { logger.debug('WebSocket client disconnected'); wsClients.delete(ws); }); // Handle WebSocket errors ws.on('error', (error) => { console.error('WebSocket error:', error); wsClients.delete(ws); }); }); console.log('WebSocket server initialized'); } // Start the server const server = app.listen(PORT, '0.0.0.0', () => { console.log(`Server is running on http://0.0.0.0:${PORT}`); console.log(`Accessible from: http://YOUR_COMPUTER_IP:${PORT}`); console.log(`UDP heartbeat server listening on port ${UDP_PORT}`); // Initialize WebSocket server after HTTP server is running initializeWebSocketServer(server); console.log('WebSocket server ready for real-time updates'); console.log('Waiting for cluster/heartbeat and node/update messages from SPORE nodes...'); }); // Graceful shutdown handling process.on('SIGINT', () => { console.log('\nReceived SIGINT. Shutting down gracefully...'); udpServer.close(() => { console.log('UDP heartbeat server closed.'); }); server.close(() => { console.log('HTTP server closed.'); process.exit(0); }); }); process.on('SIGTERM', () => { console.log('\nReceived SIGTERM. Shutting down gracefully...'); udpServer.close(() => { console.log('UDP heartbeat server closed.'); }); server.close(() => { console.log('HTTP server closed.'); process.exit(0); }); }); // Handle uncaught exceptions process.on('uncaughtException', (err) => { console.error('Uncaught Exception:', err); udpServer.close(); server.close(); process.exit(1); }); process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); udpServer.close(); server.close(); process.exit(1); });