const express = require('express'); const path = require('path'); const fs = require('fs'); const dgram = require('dgram'); const SporeApiClient = require('./src/client'); const app = express(); const PORT = process.env.PORT || 3001; // Middleware app.use(express.json()); app.use(express.urlencoded({ extended: true })); // File upload middleware const fileUpload = require('express-fileupload'); app.use(fileUpload({ limits: { fileSize: 50 * 1024 * 1024 }, // 50MB limit abortOnLimit: true, responseOnLimit: 'File size limit has been reached', debug: false })); // UDP discovery configuration const UDP_PORT = 4210; const DISCOVERY_MESSAGE = 'CLUSTER_DISCOVERY'; // Initialize UDP server for auto discovery const udpServer = dgram.createSocket('udp4'); // Store discovered nodes and their IPs const discoveredNodes = new Map(); let primaryNodeIp = null; // UDP server event handlers udpServer.on('error', (err) => { if (err.code === 'EADDRINUSE') { console.error(`UDP port ${UDP_PORT} is already in use. Please check if another instance is running.`); } else { console.error('UDP Server error:', err); } udpServer.close(); }); udpServer.on('message', (msg, rinfo) => { try { const message = msg.toString().trim(); const sourceIp = rinfo.address; const sourcePort = rinfo.port; //console.log(`UDP message received from ${sourceIp}:${sourcePort}: "${message}"`); if (message === DISCOVERY_MESSAGE) { //console.log(`Received CLUSTER_DISCOVERY from ${sourceIp}:${sourcePort}`); // Store the discovered node const nodeInfo = { ip: sourceIp, port: sourcePort, discoveredAt: new Date(), lastSeen: new Date() }; discoveredNodes.set(sourceIp, nodeInfo); // Set as primary node if this is the first one or if we don't have one if (!primaryNodeIp) { primaryNodeIp = sourceIp; console.log(`Set primary node to ${sourceIp}`); // Immediately try to initialize the client updateSporeClient(); } // Update last seen timestamp discoveredNodes.get(sourceIp).lastSeen = new Date(); //console.log(`Node ${sourceIp} added/updated. Total discovered nodes: ${discoveredNodes.size}`); } else { console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`); } } catch (error) { console.error('Error processing UDP message:', error); } }); udpServer.on('listening', () => { const address = udpServer.address(); console.log(`UDP discovery server listening on ${address.address}:${address.port}`); }); // Bind UDP server to listen for discovery messages udpServer.bind(UDP_PORT, () => { console.log(`UDP discovery server bound to port ${UDP_PORT}`); }); // Initialize the SPORE API client with dynamic IP let sporeClient = null; // Function to initialize or update the SporeApiClient function initializeSporeClient(nodeIp) { if (!nodeIp) { console.warn('No node IP available for SporeApiClient initialization'); return null; } try { const client = new SporeApiClient(`http://${nodeIp}`); console.log(`Initialized SporeApiClient with node IP: ${nodeIp}`); return client; } catch (error) { console.error(`Failed to initialize SporeApiClient with IP ${nodeIp}:`, error); return null; } } // Function to clean up stale discovered nodes (nodes not seen in the last 5 minutes) function cleanupStaleNodes() { const now = new Date(); const staleThreshold = 5 * 60 * 1000; // 5 minutes in milliseconds for (const [ip, node] of discoveredNodes.entries()) { if (now - node.lastSeen > staleThreshold) { console.log(`Removing stale node: ${ip} (last seen: ${node.lastSeen.toISOString()})`); discoveredNodes.delete(ip); // If this was our primary node, clear it if (primaryNodeIp === ip) { primaryNodeIp = null; console.log('Primary node became stale, clearing primary node selection'); } } } } // Function to select the best primary node function selectBestPrimaryNode() { if (discoveredNodes.size === 0) { return null; } // If we already have a valid primary node, keep it if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) { return primaryNodeIp; } // Select the most recently seen node as primary let bestNode = null; let mostRecent = new Date(0); for (const [ip, node] of discoveredNodes.entries()) { if (node.lastSeen > mostRecent) { mostRecent = node.lastSeen; bestNode = ip; } } if (bestNode && bestNode !== primaryNodeIp) { primaryNodeIp = bestNode; console.log(`Selected new primary node: ${bestNode}`); } return bestNode; } // Function to randomly select a primary node function selectRandomPrimaryNode() { if (discoveredNodes.size === 0) { return null; } // Convert discovered nodes to array and filter out current primary const availableNodes = Array.from(discoveredNodes.keys()).filter(ip => ip !== primaryNodeIp); if (availableNodes.length === 0) { // If no other nodes available, keep current primary return primaryNodeIp; } // Randomly select from available nodes const randomIndex = Math.floor(Math.random() * availableNodes.length); const randomNode = availableNodes[randomIndex]; // Update primary node primaryNodeIp = randomNode; console.log(`Randomly selected new primary node: ${randomNode}`); return randomNode; } // Initialize client when a node is discovered function updateSporeClient() { const nodeIp = selectBestPrimaryNode(); if (nodeIp) { sporeClient = initializeSporeClient(nodeIp); } } // Set up periodic tasks setInterval(() => { cleanupStaleNodes(); if (!sporeClient || !primaryNodeIp || !discoveredNodes.has(primaryNodeIp)) { updateSporeClient(); } }, 5000); // Check every 5 seconds // Serve static files from public directory app.use(express.static(path.join(__dirname, 'public'))); // Serve the main HTML page app.get('/', (req, res) => { res.sendFile(path.join(__dirname, 'public', 'index.html')); }); // API endpoint to get discovered nodes app.get('/api/discovery/nodes', (req, res) => { const nodes = Array.from(discoveredNodes.values()).map(node => ({ ...node, discoveredAt: node.discoveredAt.toISOString(), lastSeen: node.lastSeen.toISOString(), isPrimary: node.ip === primaryNodeIp })); res.json({ primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, nodes: nodes, clientInitialized: !!sporeClient, clientBaseUrl: sporeClient ? sporeClient.baseUrl : null, discoveryStatus: { udpPort: UDP_PORT, discoveryMessage: DISCOVERY_MESSAGE, serverRunning: udpServer.listening } }); }); // API endpoint to manually trigger discovery refresh app.post('/api/discovery/refresh', (req, res) => { try { // Clean up stale nodes cleanupStaleNodes(); // Try to update the client updateSporeClient(); res.json({ success: true, message: 'Discovery refresh completed', primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, clientInitialized: !!sporeClient }); } catch (error) { console.error('Error during discovery refresh:', error); res.status(500).json({ error: 'Discovery refresh failed', message: error.message }); } }); // API endpoint to randomly select a new primary node app.post('/api/discovery/random-primary', (req, res) => { try { if (discoveredNodes.size === 0) { return res.status(404).json({ error: 'No nodes available', message: 'No SPORE nodes have been discovered yet' }); } // Randomly select a new primary node const randomNode = selectRandomPrimaryNode(); if (!randomNode) { return res.status(500).json({ error: 'Selection failed', message: 'Failed to select a random primary node' }); } // Update the client with the new primary node updateSporeClient(); // Get current timestamp for the response const timestamp = req.body && req.body.timestamp ? req.body.timestamp : new Date().toISOString(); res.json({ success: true, message: `Randomly selected new primary node: ${randomNode}`, primaryNode: primaryNodeIp, totalNodes: discoveredNodes.size, clientInitialized: !!sporeClient, timestamp: timestamp }); } catch (error) { console.error('Error selecting random primary node:', error); res.status(500).json({ error: 'Random selection failed', message: error.message }); } }); // API endpoint to manually set primary node app.post('/api/discovery/primary/:ip', (req, res) => { try { const requestedIp = req.params.ip; if (!discoveredNodes.has(requestedIp)) { return res.status(404).json({ error: 'Node not found', message: `Node with IP ${requestedIp} has not been discovered` }); } primaryNodeIp = requestedIp; updateSporeClient(); res.json({ success: true, message: `Primary node set to ${requestedIp}`, primaryNode: primaryNodeIp, clientInitialized: !!sporeClient }); } catch (error) { console.error('Error setting primary node:', error); res.status(500).json({ error: 'Failed to set primary node', message: error.message }); } }); // API endpoint to get cluster members app.get('/api/cluster/members', async (req, res) => { try { if (!sporeClient) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const members = await sporeClient.getClusterStatus(); res.json(members); } catch (error) { console.error('Error fetching cluster members:', error); res.status(500).json({ error: 'Failed to fetch cluster members', message: error.message }); } }); // API endpoint to get task status app.get('/api/tasks/status', async (req, res) => { try { const { ip } = req.query; if (ip) { try { const nodeClient = new SporeApiClient(`http://${ip}`); const taskStatus = await nodeClient.getTaskStatus(); return res.json(taskStatus); } catch (innerError) { console.error('Error fetching task status from specific node:', innerError); return res.status(500).json({ error: 'Failed to fetch task status from node', message: innerError.message }); } } if (!sporeClient) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const taskStatus = await sporeClient.getTaskStatus(); res.json(taskStatus); } catch (error) { console.error('Error fetching task status:', error); res.status(500).json({ error: 'Failed to fetch task status', message: error.message }); } }); // API endpoint to get system status app.get('/api/node/status', async (req, res) => { try { if (!sporeClient) { return res.status(503).json({ error: 'Service unavailable', message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...', discoveredNodes: Array.from(discoveredNodes.keys()) }); } const systemStatus = await sporeClient.getSystemStatus(); res.json(systemStatus); } catch (error) { console.error('Error fetching system status:', error); res.status(500).json({ error: 'Failed to fetch system status', message: error.message }); } }); // Proxy endpoint to get status from a specific node app.get('/api/node/status/:ip', async (req, res) => { try { const nodeIp = req.params.ip; // Create a temporary client for the specific node const nodeClient = new SporeApiClient(`http://${nodeIp}`); const nodeStatus = await nodeClient.getSystemStatus(); res.json(nodeStatus); } catch (error) { console.error(`Error fetching status from node ${req.params.ip}:`, error); res.status(500).json({ error: `Failed to fetch status from node ${req.params.ip}`, message: error.message }); } }); // File upload endpoint for firmware updates app.post('/api/node/update', async (req, res) => { try { const nodeIp = req.query.ip || req.headers['x-node-ip']; if (!nodeIp) { return res.status(400).json({ error: 'Node IP address is required', message: 'Please provide the target node IP address' }); } // Check if we have a file in the request if (!req.files || !req.files.file) { console.log('File upload request received but no file found:', { hasFiles: !!req.files, fileKeys: req.files ? Object.keys(req.files) : [], contentType: req.headers['content-type'] }); return res.status(400).json({ error: 'No file data received', message: 'Please select a firmware file to upload' }); } const uploadedFile = req.files.file; console.log(`File upload received:`, { nodeIp: nodeIp, filename: uploadedFile.name, fileSize: uploadedFile.data.length, mimetype: uploadedFile.mimetype, encoding: uploadedFile.encoding }); // Create a temporary client for the specific node const nodeClient = new SporeApiClient(`http://${nodeIp}`); console.log(`Created SPORE client for node ${nodeIp}`); // Send the firmware data to the node console.log(`Starting firmware upload to SPORE device ${nodeIp}...`); try { const updateResult = await nodeClient.updateFirmware(uploadedFile.data, uploadedFile.name); console.log(`Firmware upload to SPORE device ${nodeIp} completed successfully:`, updateResult); res.json({ success: true, message: 'Firmware uploaded successfully', nodeIp: nodeIp, fileSize: uploadedFile.data.length, filename: uploadedFile.name, result: updateResult }); } catch (uploadError) { console.error(`Firmware upload to SPORE device ${nodeIp} failed:`, uploadError); throw new Error(`SPORE device upload failed: ${uploadError.message}`); } } catch (error) { console.error('Error uploading firmware:', error); res.status(500).json({ error: 'Failed to upload firmware', message: error.message }); } }); // Health check endpoint app.get('/api/health', (req, res) => { const health = { status: 'healthy', timestamp: new Date().toISOString(), services: { http: true, udp: udpServer.listening, sporeClient: !!sporeClient }, discovery: { totalNodes: discoveredNodes.size, primaryNode: primaryNodeIp, udpPort: UDP_PORT, serverRunning: udpServer.listening } }; // If no nodes discovered, mark as degraded if (discoveredNodes.size === 0) { health.status = 'degraded'; health.message = 'No SPORE nodes discovered yet'; } // If no client initialized, mark as degraded if (!sporeClient) { health.status = 'degraded'; health.message = health.message ? `${health.message}; SPORE client not initialized` : 'SPORE client not initialized'; } const statusCode = health.status === 'healthy' ? 200 : 503; res.status(statusCode).json(health); }); // Start the server const server = app.listen(PORT, () => { console.log(`Server is running on http://localhost:${PORT}`); console.log(`UDP discovery server listening on port ${UDP_PORT}`); console.log('Waiting for CLUSTER_DISCOVERY messages from SPORE nodes...'); }); // Graceful shutdown handling process.on('SIGINT', () => { console.log('\nReceived SIGINT. Shutting down gracefully...'); udpServer.close(() => { console.log('UDP server closed.'); }); server.close(() => { console.log('HTTP server closed.'); process.exit(0); }); }); process.on('SIGTERM', () => { console.log('\nReceived SIGTERM. Shutting down gracefully...'); udpServer.close(() => { console.log('UDP server closed.'); }); server.close(() => { console.log('HTTP server closed.'); process.exit(0); }); }); // Handle uncaught exceptions process.on('uncaughtException', (err) => { console.error('Uncaught Exception:', err); udpServer.close(); server.close(); process.exit(1); }); process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); udpServer.close(); server.close(); process.exit(1); });