const express = require('express'); const path = require('path'); const fs = require('fs'); const dgram = require('dgram'); const SporeApiClient = require('./src/client'); const app = express(); const PORT = process.env.PORT || 3001; // Middleware app.use(express.json()); app.use(express.urlencoded({ extended: true })); // UDP discovery configuration const UDP_PORT = 4210; const DISCOVERY_MESSAGE = 'CLUSTER_DISCOVERY'; // Initialize UDP server for auto discovery const udpServer = dgram.createSocket('udp4'); // Store discovered nodes and their IPs const discoveredNodes = new Map(); let primaryNodeIp = null; // UDP server event handlers udpServer.on('error', (err) => { if (err.code === 'EADDRINUSE') { console.error(`UDP port ${UDP_PORT} is already in use. Please check if another instance is running.`); } else { console.error('UDP Server error:', err); } udpServer.close(); }); udpServer.on('message', (msg, rinfo) => { try { const message = msg.toString().trim(); const sourceIp = rinfo.address; const sourcePort = rinfo.port; //console.log(`UDP message received from ${sourceIp}:${sourcePort}: "${message}"`); if (message === DISCOVERY_MESSAGE) { //console.log(`Received CLUSTER_DISCOVERY from ${sourceIp}:${sourcePort}`); // Store the discovered node const nodeInfo = { ip: sourceIp, port: sourcePort, discoveredAt: new Date(), lastSeen: new Date() }; discoveredNodes.set(sourceIp, nodeInfo); // Set as primary node if this is the first one or if we don't have one if (!primaryNodeIp) { primaryNodeIp = sourceIp; console.log(`Set primary node to ${sourceIp}`); // Immediately try to initialize the client updateSporeClient(); } // Update last seen timestamp discoveredNodes.get(sourceIp).lastSeen = new Date(); //console.log(`Node ${sourceIp} added/updated. Total discovered nodes: ${discoveredNodes.size}`); } else { console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`); } } catch (error) { console.error('Error processing UDP message:', error); } }); udpServer.on('listening', () => { const address = udpServer.address(); console.log(`UDP discovery server listening on ${address.address}:${address.port}`); }); // Bind UDP server to listen for discovery messages udpServer.bind(UDP_PORT, () => { console.log(`UDP discovery server bound to port ${UDP_PORT}`); }); // Initialize the SPORE API client with dynamic IP let sporeClient = null; // Function to initialize or update the SporeApiClient function initializeSporeClient(nodeIp) { if (!nodeIp) { console.warn('No node IP available for SporeApiClient initialization'); return null; } try { const client = new SporeApiClient(`http://${nodeIp}`); console.log(`Initialized SporeApiClient with node IP: ${nodeIp}`); return client; } catch (error) { console.error(`Failed to initialize SporeApiClient with IP ${nodeIp}:`, error); return null; } } // Function to clean up stale discovered nodes (nodes not seen in the last 5 minutes) function cleanupStaleNodes() { const now = new Date(); const staleThreshold = 5 * 60 * 1000; // 5 minutes in milliseconds for (const [ip, node] of discoveredNodes.entries()) { if (now - node.lastSeen > staleThreshold) { console.log(`Removing stale node: ${ip} (last seen: ${node.lastSeen.toISOString()})`); discoveredNodes.delete(ip); // If this was our primary node, clear it if (primaryNodeIp === ip) { primaryNodeIp = null; console.log('Primary node became stale, clearing primary node selection'); } } } } // Function to select the best primary node function selectBestPrimaryNode() { if (discoveredNodes.size === 0) { return null; } // If we already have a valid primary node, keep it if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) { return primaryNodeIp; } // Select the most recently seen node as primary let bestNode = null; let mostRecent = new Date(0); for (const [ip, node] of discoveredNodes.entries()) { if (node.lastSeen > mostRecent) { mostRecent = node.lastSeen; bestNode = ip; } } if (bestNode && bestNode !== primaryNodeIp) { primaryNodeIp = bestNode; console.log(`Selected new primary node: ${bestNode}`); } return bestNode; } // Function to randomly select a primary node function selectRandomPrimaryNode() { if (discoveredNodes.size === 0) { return null; } // Convert discovered nodes to array and filter out current primary const availableNodes = Array.from(discoveredNodes.keys()).filter(ip => ip !== primaryNodeIp); if (availableNodes.length === 0) { // If no other nodes available, keep current primary return primaryNodeIp; } // Randomly select from available nodes const randomIndex = Math.floor(Math.random() * availableNodes.length); const randomNode = availableNodes[randomIndex]; // Update primary node primaryNodeIp = randomNode; console.log(`Randomly selected new primary node: ${randomNode}`); return randomNode; } // Initialize client when a node is discovered function updateSporeClient() { const nodeIp = selectBestPrimaryNode(); if (nodeIp) { sporeClient = initializeSporeClient(nodeIp); } } // Set up periodic tasks setInterval(() => { cleanupStaleNodes(); if (!sporeClient || !primaryNodeIp || !discoveredNodes.has(primaryNodeIp)) { updateSporeClient(); } }, 5000); // Check every 5 seconds // Serve static files from public directory app.use(express.static(path.join(__dirname, 'public'))); // Serve the main HTML page app.get('/', (req, res) => { res.sendFile(path.join(__dirname, 'public', 'index.html')); }); // API endpoint to get discovered nodes app.get('/api/discovery/nodes', (req, res) => { const nodes = Array.from(discoveredNodes.values()).map(node => ({ ...node, discoveredAt: node.discoveredAt.toISOString(), lastSeen: node.lastSeen.toISOString(), isPrimary: node.ip === primaryNodeIp })); res.json({ primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, nodes: nodes, clientInitialized: !!sporeClient, clientBaseUrl: sporeClient ? sporeClient.baseUrl : null, discoveryStatus: { udpPort: UDP_PORT, discoveryMessage: DISCOVERY_MESSAGE, serverRunning: udpServer.listening } }); }); // API endpoint to manually trigger discovery refresh app.post('/api/discovery/refresh', (req, res) => { try { // Clean up stale nodes cleanupStaleNodes(); // Try to update the client updateSporeClient(); res.json({ success: true, message: 'Discovery refresh completed', primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, clientInitialized: !!sporeClient }); } catch (error) { console.error('Error during discovery refresh:', error); res.status(500).json({ error: 'Discovery refresh failed', message: error.message }); } }); // API endpoint to randomly select a new primary node app.post('/api/discovery/random-primary', (req, res) => { try { if (discoveredNodes.size === 0) { return res.status(404).json({ error: 'No nodes available', message: 'No SPORE nodes have been discovered yet' }); } // Randomly select a new primary node const randomNode = selectRandomPrimaryNode(); if (!randomNode) { return res.status(500).json({ error: 'Selection failed', message: 'Failed to select a random primary node' }); } // Update the client with the new primary node updateSporeClient(); // Get current timestamp for the response const timestamp = req.body && req.body.timestamp ? req.body.timestamp : new Date().toISOString(); res.json({ success: true, message: `Randomly selected new primary node: ${randomNode}`, primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, clientInitialized: !!sporeClient, timestamp: timestamp }); } catch (error) { console.error('Error selecting random primary node:', error); res.status(500).json({ error: 'Random selection failed', message: error.message }); } }); // API endpoint to manually set primary node app.post('/api/discovery/primary/:ip', (req, res) => { try { const requestedIp = req.params.ip; if (!discoveredNodes.has(requestedIp)) { return res.status(404).json({ error: 'Node not found', message: `Node with IP ${requestedIp} has not been discovered` }); } primaryNodeIp = requestedIp; updateSporeClient(); res.json({ success: true, message: `Primary node set to ${requestedIp}`, primaryNode: primaryNodeIp, clientInitialized: !!sporeClient }); } catch (error) { console.error('Error setting primary node:', error); res.status(500).json({ error: 'Failed to set primary node', message: error.message }); } }); // API endpoint to get cluster members app.get('/api/cluster/members', async (req, res) => { try { if (!sporeClient) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const members = await sporeClient.getClusterStatus(); res.json(members); } catch (error) { console.error('Error fetching cluster members:', error); res.status(500).json({ error: 'Failed to fetch cluster members', message: error.message }); } }); // API endpoint to get task status app.get('/api/tasks/status', async (req, res) => { try { if (!sporeClient) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const taskStatus = await sporeClient.getTaskStatus(); res.json(taskStatus); } catch (error) { console.error('Error fetching task status:', error); res.status(500).json({ error: 'Failed to fetch task status', message: error.message }); } }); // API endpoint to get system status app.get('/api/node/status', async (req, res) => { try { if (!sporeClient) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const systemStatus = await sporeClient.getSystemStatus(); res.json(systemStatus); } catch (error) { console.error('Error fetching system status:', error); res.status(500).json({ error: 'Failed to fetch system status', message: error.message }); } }); // Proxy endpoint to get status from a specific node app.get('/api/node/status/:ip', async (req, res) => { try { const nodeIp = req.params.ip; // Create a temporary client for the specific node const nodeClient = new SporeApiClient(`http://${nodeIp}`); const nodeStatus = await nodeClient.getSystemStatus(); res.json(nodeStatus); } catch (error) { console.error(`Error fetching status from node ${req.params.ip}:`, error); res.status(500).json({ error: `Failed to fetch status from node ${req.params.ip}`, message: error.message }); } }); // File upload endpoint for firmware updates app.post('/api/node/update', express.raw({ type: 'multipart/form-data', limit: '50mb' }), async (req, res) => { try { const nodeIp = req.query.ip || req.headers['x-node-ip']; if (!nodeIp) { return res.status(400).json({ error: 'Node IP address is required', message: 'Please provide the target node IP address' }); } // Parse multipart form data manually const boundary = req.headers['content-type']?.split('boundary=')[1]; if (!boundary) { return res.status(400).json({ error: 'Invalid content type', message: 'Expected multipart/form-data with boundary' }); } // Parse the multipart data to extract the file const fileData = parseMultipartData(req.body, boundary); if (!fileData.file) { return res.status(400).json({ error: 'No file data received', message: 'Please select a firmware file to upload' }); } console.log(`Uploading firmware to node ${nodeIp}, file size: ${fileData.file.data.length} bytes, filename: ${fileData.file.filename}`); // Create a temporary client for the specific node const nodeClient = new SporeApiClient(`http://${nodeIp}`); // Send the firmware data to the node using multipart form data const updateResult = await nodeClient.updateFirmware(fileData.file.data, fileData.file.filename); res.json({ success: true, message: 'Firmware uploaded successfully', nodeIp: nodeIp, fileSize: fileData.file.data.length, filename: fileData.file.filename, result: updateResult }); } catch (error) { console.error('Error uploading firmware:', error); res.status(500).json({ error: 'Failed to upload firmware', message: error.message }); } }); // Health check endpoint app.get('/api/health', (req, res) => { const health = { status: 'healthy', timestamp: new Date().toISOString(), services: { http: true, udp: udpServer.listening, sporeClient: !!sporeClient }, discovery: { totalNodes: discoveredNodes.size, primaryNode: primaryNodeIp, udpPort: UDP_PORT, serverRunning: udpServer.listening } }; // If no nodes discovered, mark as degraded if (discoveredNodes.size === 0) { health.status = 'degraded'; health.message = 'No SPORE nodes discovered yet'; } // If no client initialized, mark as degraded if (!sporeClient) { health.status = 'degraded'; health.message = health.message ? `${health.message}; SPORE client not initialized` : 'SPORE client not initialized'; } const statusCode = health.status === 'healthy' ? 200 : 503; res.status(statusCode).json(health); }); // Helper function to parse multipart form data function parseMultipartData(buffer, boundary) { const data = {}; const boundaryBuffer = Buffer.from(`--${boundary}`); const endBoundaryBuffer = Buffer.from(`--${boundary}--`); let start = 0; let end = 0; while (true) { // Find the start of the next part start = buffer.indexOf(boundaryBuffer, end); if (start === -1) break; // Find the end of this part end = buffer.indexOf(boundaryBuffer, start + boundaryBuffer.length); if (end === -1) { end = buffer.indexOf(endBoundaryBuffer, start + boundaryBuffer.length); if (end === -1) break; } // Extract the part content const partBuffer = buffer.slice(start + boundaryBuffer.length + 2, end); // Parse headers and content const headerEnd = partBuffer.indexOf('\r\n\r\n'); if (headerEnd === -1) continue; const headers = partBuffer.slice(0, headerEnd).toString(); const content = partBuffer.slice(headerEnd + 4); // Parse the Content-Disposition header to get field name and filename const nameMatch = headers.match(/name="([^"]+)"/); const filenameMatch = headers.match(/filename="([^"]+)"/); if (nameMatch) { const fieldName = nameMatch[1]; const filename = filenameMatch ? filenameMatch[1] : null; data[fieldName] = { data: content, filename: filename }; } } return data; } // Start the server const server = app.listen(PORT, () => { console.log(`Server is running on http://localhost:${PORT}`); console.log(`UDP discovery server listening on port ${UDP_PORT}`); console.log('Waiting for CLUSTER_DISCOVERY messages from SPORE nodes...'); }); // Graceful shutdown handling process.on('SIGINT', () => { console.log('\nReceived SIGINT. Shutting down gracefully...'); udpServer.close(() => { console.log('UDP server closed.'); }); server.close(() => { console.log('HTTP server closed.'); process.exit(0); }); }); process.on('SIGTERM', () => { console.log('\nReceived SIGTERM. Shutting down gracefully...'); udpServer.close(() => { console.log('UDP server closed.'); }); server.close(() => { console.log('HTTP server closed.'); process.exit(0); }); }); // Handle uncaught exceptions process.on('uncaughtException', (err) => { console.error('Uncaught Exception:', err); udpServer.close(); server.close(); process.exit(1); }); process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); udpServer.close(); server.close(); process.exit(1); });