584 lines
17 KiB
JavaScript
584 lines
17 KiB
JavaScript
const express = require('express');
|
|
const path = require('path');
|
|
const fs = require('fs');
|
|
const dgram = require('dgram');
|
|
const SporeApiClient = require('./src/client');
|
|
|
|
const app = express();
|
|
const PORT = process.env.PORT || 3001;
|
|
|
|
// Middleware
|
|
app.use(express.json());
|
|
app.use(express.urlencoded({ extended: true }));
|
|
|
|
// File upload middleware
|
|
const fileUpload = require('express-fileupload');
|
|
app.use(fileUpload({
|
|
limits: { fileSize: 50 * 1024 * 1024 }, // 50MB limit
|
|
abortOnLimit: true,
|
|
responseOnLimit: 'File size limit has been reached',
|
|
debug: false
|
|
}));
|
|
|
|
// UDP discovery configuration
|
|
const UDP_PORT = 4210;
|
|
const DISCOVERY_MESSAGE = 'CLUSTER_DISCOVERY';
|
|
|
|
// Initialize UDP server for auto discovery
|
|
const udpServer = dgram.createSocket('udp4');
|
|
|
|
// Store discovered nodes and their IPs
|
|
const discoveredNodes = new Map();
|
|
let primaryNodeIp = null;
|
|
|
|
// UDP server event handlers
|
|
udpServer.on('error', (err) => {
|
|
if (err.code === 'EADDRINUSE') {
|
|
console.error(`UDP port ${UDP_PORT} is already in use. Please check if another instance is running.`);
|
|
} else {
|
|
console.error('UDP Server error:', err);
|
|
}
|
|
udpServer.close();
|
|
});
|
|
|
|
udpServer.on('message', (msg, rinfo) => {
|
|
try {
|
|
const message = msg.toString().trim();
|
|
const sourceIp = rinfo.address;
|
|
const sourcePort = rinfo.port;
|
|
|
|
//console.log(`UDP message received from ${sourceIp}:${sourcePort}: "${message}"`);
|
|
|
|
if (message === DISCOVERY_MESSAGE) {
|
|
//console.log(`Received CLUSTER_DISCOVERY from ${sourceIp}:${sourcePort}`);
|
|
|
|
// Store the discovered node
|
|
const nodeInfo = {
|
|
ip: sourceIp,
|
|
port: sourcePort,
|
|
discoveredAt: new Date(),
|
|
lastSeen: new Date()
|
|
};
|
|
|
|
discoveredNodes.set(sourceIp, nodeInfo);
|
|
|
|
// Set as primary node if this is the first one or if we don't have one
|
|
if (!primaryNodeIp) {
|
|
primaryNodeIp = sourceIp;
|
|
console.log(`Set primary node to ${sourceIp}`);
|
|
|
|
// Immediately try to initialize the client
|
|
updateSporeClient();
|
|
}
|
|
|
|
// Update last seen timestamp
|
|
discoveredNodes.get(sourceIp).lastSeen = new Date();
|
|
|
|
//console.log(`Node ${sourceIp} added/updated. Total discovered nodes: ${discoveredNodes.size}`);
|
|
} else {
|
|
console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`);
|
|
}
|
|
} catch (error) {
|
|
console.error('Error processing UDP message:', error);
|
|
}
|
|
});
|
|
|
|
udpServer.on('listening', () => {
|
|
const address = udpServer.address();
|
|
console.log(`UDP discovery server listening on ${address.address}:${address.port}`);
|
|
});
|
|
|
|
// Bind UDP server to listen for discovery messages
|
|
udpServer.bind(UDP_PORT, () => {
|
|
console.log(`UDP discovery server bound to port ${UDP_PORT}`);
|
|
});
|
|
|
|
// Initialize the SPORE API client with dynamic IP
|
|
let sporeClient = null;
|
|
|
|
// Function to initialize or update the SporeApiClient
|
|
function initializeSporeClient(nodeIp) {
|
|
if (!nodeIp) {
|
|
console.warn('No node IP available for SporeApiClient initialization');
|
|
return null;
|
|
}
|
|
|
|
try {
|
|
const client = new SporeApiClient(`http://${nodeIp}`);
|
|
console.log(`Initialized SporeApiClient with node IP: ${nodeIp}`);
|
|
return client;
|
|
} catch (error) {
|
|
console.error(`Failed to initialize SporeApiClient with IP ${nodeIp}:`, error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
// Function to clean up stale discovered nodes (nodes not seen in the last 5 minutes)
|
|
function cleanupStaleNodes() {
|
|
const now = new Date();
|
|
const staleThreshold = 5 * 60 * 1000; // 5 minutes in milliseconds
|
|
|
|
for (const [ip, node] of discoveredNodes.entries()) {
|
|
if (now - node.lastSeen > staleThreshold) {
|
|
console.log(`Removing stale node: ${ip} (last seen: ${node.lastSeen.toISOString()})`);
|
|
discoveredNodes.delete(ip);
|
|
|
|
// If this was our primary node, clear it
|
|
if (primaryNodeIp === ip) {
|
|
primaryNodeIp = null;
|
|
console.log('Primary node became stale, clearing primary node selection');
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Function to select the best primary node
|
|
function selectBestPrimaryNode() {
|
|
if (discoveredNodes.size === 0) {
|
|
return null;
|
|
}
|
|
|
|
// If we already have a valid primary node, keep it
|
|
if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) {
|
|
return primaryNodeIp;
|
|
}
|
|
|
|
// Select the most recently seen node as primary
|
|
let bestNode = null;
|
|
let mostRecent = new Date(0);
|
|
|
|
for (const [ip, node] of discoveredNodes.entries()) {
|
|
if (node.lastSeen > mostRecent) {
|
|
mostRecent = node.lastSeen;
|
|
bestNode = ip;
|
|
}
|
|
}
|
|
|
|
if (bestNode && bestNode !== primaryNodeIp) {
|
|
primaryNodeIp = bestNode;
|
|
console.log(`Selected new primary node: ${bestNode}`);
|
|
}
|
|
|
|
return bestNode;
|
|
}
|
|
|
|
// Function to randomly select a primary node
|
|
function selectRandomPrimaryNode() {
|
|
if (discoveredNodes.size === 0) {
|
|
return null;
|
|
}
|
|
|
|
// Convert discovered nodes to array and filter out current primary
|
|
const availableNodes = Array.from(discoveredNodes.keys()).filter(ip => ip !== primaryNodeIp);
|
|
|
|
if (availableNodes.length === 0) {
|
|
// If no other nodes available, keep current primary
|
|
return primaryNodeIp;
|
|
}
|
|
|
|
// Randomly select from available nodes
|
|
const randomIndex = Math.floor(Math.random() * availableNodes.length);
|
|
const randomNode = availableNodes[randomIndex];
|
|
|
|
// Update primary node
|
|
primaryNodeIp = randomNode;
|
|
console.log(`Randomly selected new primary node: ${randomNode}`);
|
|
|
|
return randomNode;
|
|
}
|
|
|
|
// Initialize client when a node is discovered
|
|
function updateSporeClient() {
|
|
const nodeIp = selectBestPrimaryNode();
|
|
if (nodeIp) {
|
|
sporeClient = initializeSporeClient(nodeIp);
|
|
}
|
|
}
|
|
|
|
// Set up periodic tasks
|
|
setInterval(() => {
|
|
cleanupStaleNodes();
|
|
if (!sporeClient || !primaryNodeIp || !discoveredNodes.has(primaryNodeIp)) {
|
|
updateSporeClient();
|
|
}
|
|
}, 5000); // Check every 5 seconds
|
|
|
|
// Serve static files from public directory
|
|
app.use(express.static(path.join(__dirname, 'public')));
|
|
|
|
// Serve the main HTML page
|
|
app.get('/', (req, res) => {
|
|
res.sendFile(path.join(__dirname, 'public', 'index.html'));
|
|
});
|
|
|
|
// API endpoint to get discovered nodes
|
|
app.get('/api/discovery/nodes', (req, res) => {
|
|
const nodes = Array.from(discoveredNodes.values()).map(node => ({
|
|
...node,
|
|
discoveredAt: node.discoveredAt.toISOString(),
|
|
lastSeen: node.lastSeen.toISOString(),
|
|
isPrimary: node.ip === primaryNodeIp
|
|
}));
|
|
|
|
res.json({
|
|
primaryNode: primaryNodeIp,
|
|
totalNodes: discoveredNodes.size,
|
|
nodes: nodes,
|
|
clientInitialized: !!sporeClient,
|
|
clientBaseUrl: sporeClient ? sporeClient.baseUrl : null,
|
|
discoveryStatus: {
|
|
udpPort: UDP_PORT,
|
|
discoveryMessage: DISCOVERY_MESSAGE,
|
|
serverRunning: udpServer.listening
|
|
}
|
|
});
|
|
});
|
|
|
|
// API endpoint to manually trigger discovery refresh
|
|
app.post('/api/discovery/refresh', (req, res) => {
|
|
try {
|
|
// Clean up stale nodes
|
|
cleanupStaleNodes();
|
|
|
|
// Try to update the client
|
|
updateSporeClient();
|
|
|
|
res.json({
|
|
success: true,
|
|
message: 'Discovery refresh completed',
|
|
primaryNode: primaryNodeIp,
|
|
totalNodes: discoveredNodes.size,
|
|
clientInitialized: !!sporeClient
|
|
});
|
|
} catch (error) {
|
|
console.error('Error during discovery refresh:', error);
|
|
res.status(500).json({
|
|
error: 'Discovery refresh failed',
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// API endpoint to randomly select a new primary node
|
|
app.post('/api/discovery/random-primary', (req, res) => {
|
|
try {
|
|
if (discoveredNodes.size === 0) {
|
|
return res.status(404).json({
|
|
error: 'No nodes available',
|
|
message: 'No SPORE nodes have been discovered yet'
|
|
});
|
|
}
|
|
|
|
// Randomly select a new primary node
|
|
const randomNode = selectRandomPrimaryNode();
|
|
|
|
if (!randomNode) {
|
|
return res.status(500).json({
|
|
error: 'Selection failed',
|
|
message: 'Failed to select a random primary node'
|
|
});
|
|
}
|
|
|
|
// Update the client with the new primary node
|
|
updateSporeClient();
|
|
|
|
// Get current timestamp for the response
|
|
const timestamp = req.body && req.body.timestamp ? req.body.timestamp : new Date().toISOString();
|
|
|
|
res.json({
|
|
success: true,
|
|
message: `Randomly selected new primary node: ${randomNode}`,
|
|
primaryNode: primaryNodeIp,
|
|
totalNodes: discoveredNodes.size,
|
|
clientInitialized: !!sporeClient,
|
|
timestamp: timestamp
|
|
});
|
|
} catch (error) {
|
|
console.error('Error selecting random primary node:', error);
|
|
res.status(500).json({
|
|
error: 'Random selection failed',
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// API endpoint to manually set primary node
|
|
app.post('/api/discovery/primary/:ip', (req, res) => {
|
|
try {
|
|
const requestedIp = req.params.ip;
|
|
|
|
if (!discoveredNodes.has(requestedIp)) {
|
|
return res.status(404).json({
|
|
error: 'Node not found',
|
|
message: `Node with IP ${requestedIp} has not been discovered`
|
|
});
|
|
}
|
|
|
|
primaryNodeIp = requestedIp;
|
|
updateSporeClient();
|
|
|
|
res.json({
|
|
success: true,
|
|
message: `Primary node set to ${requestedIp}`,
|
|
primaryNode: primaryNodeIp,
|
|
clientInitialized: !!sporeClient
|
|
});
|
|
} catch (error) {
|
|
console.error('Error setting primary node:', error);
|
|
res.status(500).json({
|
|
error: 'Failed to set primary node',
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// API endpoint to get cluster members
|
|
app.get('/api/cluster/members', async (req, res) => {
|
|
try {
|
|
if (!sporeClient) {
|
|
return res.status(503).json({
|
|
error: 'Service unavailable',
|
|
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
|
|
discoveredNodes: Array.from(discoveredNodes.keys())
|
|
});
|
|
}
|
|
|
|
const members = await sporeClient.getClusterStatus();
|
|
res.json(members);
|
|
} catch (error) {
|
|
console.error('Error fetching cluster members:', error);
|
|
res.status(500).json({
|
|
error: 'Failed to fetch cluster members',
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// API endpoint to get task status
|
|
app.get('/api/tasks/status', async (req, res) => {
|
|
try {
|
|
const { ip } = req.query;
|
|
|
|
if (ip) {
|
|
try {
|
|
const nodeClient = new SporeApiClient(`http://${ip}`);
|
|
const taskStatus = await nodeClient.getTaskStatus();
|
|
return res.json(taskStatus);
|
|
} catch (innerError) {
|
|
console.error('Error fetching task status from specific node:', innerError);
|
|
return res.status(500).json({
|
|
error: 'Failed to fetch task status from node',
|
|
message: innerError.message
|
|
});
|
|
}
|
|
}
|
|
|
|
if (!sporeClient) {
|
|
return res.status(503).json({
|
|
error: 'Service unavailable',
|
|
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
|
|
discoveredNodes: Array.from(discoveredNodes.keys())
|
|
});
|
|
}
|
|
|
|
const taskStatus = await sporeClient.getTaskStatus();
|
|
res.json(taskStatus);
|
|
} catch (error) {
|
|
console.error('Error fetching task status:', error);
|
|
res.status(500).json({
|
|
error: 'Failed to fetch task status',
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// API endpoint to get system status
|
|
app.get('/api/node/status', async (req, res) => {
|
|
try {
|
|
if (!sporeClient) {
|
|
return res.status(503).json({
|
|
error: 'Service unavailable',
|
|
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
|
|
discoveredNodes: Array.from(discoveredNodes.keys())
|
|
});
|
|
}
|
|
|
|
const systemStatus = await sporeClient.getSystemStatus();
|
|
res.json(systemStatus);
|
|
} catch (error) {
|
|
console.error('Error fetching system status:', error);
|
|
res.status(500).json({
|
|
error: 'Failed to fetch system status',
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// Proxy endpoint to get status from a specific node
|
|
app.get('/api/node/status/:ip', async (req, res) => {
|
|
try {
|
|
const nodeIp = req.params.ip;
|
|
|
|
// Create a temporary client for the specific node
|
|
const nodeClient = new SporeApiClient(`http://${nodeIp}`);
|
|
const nodeStatus = await nodeClient.getSystemStatus();
|
|
|
|
res.json(nodeStatus);
|
|
} catch (error) {
|
|
console.error(`Error fetching status from node ${req.params.ip}:`, error);
|
|
res.status(500).json({
|
|
error: `Failed to fetch status from node ${req.params.ip}`,
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// File upload endpoint for firmware updates
|
|
app.post('/api/node/update', async (req, res) => {
|
|
try {
|
|
const nodeIp = req.query.ip || req.headers['x-node-ip'];
|
|
|
|
if (!nodeIp) {
|
|
return res.status(400).json({
|
|
error: 'Node IP address is required',
|
|
message: 'Please provide the target node IP address'
|
|
});
|
|
}
|
|
|
|
// Check if we have a file in the request
|
|
if (!req.files || !req.files.file) {
|
|
console.log('File upload request received but no file found:', {
|
|
hasFiles: !!req.files,
|
|
fileKeys: req.files ? Object.keys(req.files) : [],
|
|
contentType: req.headers['content-type']
|
|
});
|
|
return res.status(400).json({
|
|
error: 'No file data received',
|
|
message: 'Please select a firmware file to upload'
|
|
});
|
|
}
|
|
|
|
const uploadedFile = req.files.file;
|
|
console.log(`File upload received:`, {
|
|
nodeIp: nodeIp,
|
|
filename: uploadedFile.name,
|
|
fileSize: uploadedFile.data.length,
|
|
mimetype: uploadedFile.mimetype,
|
|
encoding: uploadedFile.encoding
|
|
});
|
|
|
|
// Create a temporary client for the specific node
|
|
const nodeClient = new SporeApiClient(`http://${nodeIp}`);
|
|
console.log(`Created SPORE client for node ${nodeIp}`);
|
|
|
|
// Send the firmware data to the node
|
|
console.log(`Starting firmware upload to SPORE device ${nodeIp}...`);
|
|
|
|
try {
|
|
const updateResult = await nodeClient.updateFirmware(uploadedFile.data, uploadedFile.name);
|
|
console.log(`Firmware upload to SPORE device ${nodeIp} completed successfully:`, updateResult);
|
|
|
|
res.json({
|
|
success: true,
|
|
message: 'Firmware uploaded successfully',
|
|
nodeIp: nodeIp,
|
|
fileSize: uploadedFile.data.length,
|
|
filename: uploadedFile.name,
|
|
result: updateResult
|
|
});
|
|
} catch (uploadError) {
|
|
console.error(`Firmware upload to SPORE device ${nodeIp} failed:`, uploadError);
|
|
throw new Error(`SPORE device upload failed: ${uploadError.message}`);
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error('Error uploading firmware:', error);
|
|
res.status(500).json({
|
|
error: 'Failed to upload firmware',
|
|
message: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// Health check endpoint
|
|
app.get('/api/health', (req, res) => {
|
|
const health = {
|
|
status: 'healthy',
|
|
timestamp: new Date().toISOString(),
|
|
services: {
|
|
http: true,
|
|
udp: udpServer.listening,
|
|
sporeClient: !!sporeClient
|
|
},
|
|
discovery: {
|
|
totalNodes: discoveredNodes.size,
|
|
primaryNode: primaryNodeIp,
|
|
udpPort: UDP_PORT,
|
|
serverRunning: udpServer.listening
|
|
}
|
|
};
|
|
|
|
// If no nodes discovered, mark as degraded
|
|
if (discoveredNodes.size === 0) {
|
|
health.status = 'degraded';
|
|
health.message = 'No SPORE nodes discovered yet';
|
|
}
|
|
|
|
// If no client initialized, mark as degraded
|
|
if (!sporeClient) {
|
|
health.status = 'degraded';
|
|
health.message = health.message ?
|
|
`${health.message}; SPORE client not initialized` :
|
|
'SPORE client not initialized';
|
|
}
|
|
|
|
const statusCode = health.status === 'healthy' ? 200 : 503;
|
|
res.status(statusCode).json(health);
|
|
});
|
|
|
|
|
|
|
|
// Start the server
|
|
const server = app.listen(PORT, () => {
|
|
console.log(`Server is running on http://localhost:${PORT}`);
|
|
console.log(`UDP discovery server listening on port ${UDP_PORT}`);
|
|
console.log('Waiting for CLUSTER_DISCOVERY messages from SPORE nodes...');
|
|
});
|
|
|
|
// Graceful shutdown handling
|
|
process.on('SIGINT', () => {
|
|
console.log('\nReceived SIGINT. Shutting down gracefully...');
|
|
udpServer.close(() => {
|
|
console.log('UDP server closed.');
|
|
});
|
|
server.close(() => {
|
|
console.log('HTTP server closed.');
|
|
process.exit(0);
|
|
});
|
|
});
|
|
|
|
process.on('SIGTERM', () => {
|
|
console.log('\nReceived SIGTERM. Shutting down gracefully...');
|
|
udpServer.close(() => {
|
|
console.log('UDP server closed.');
|
|
});
|
|
server.close(() => {
|
|
console.log('HTTP server closed.');
|
|
process.exit(0);
|
|
});
|
|
});
|
|
|
|
// Handle uncaught exceptions
|
|
process.on('uncaughtException', (err) => {
|
|
console.error('Uncaught Exception:', err);
|
|
udpServer.close();
|
|
server.close();
|
|
process.exit(1);
|
|
});
|
|
|
|
process.on('unhandledRejection', (reason, promise) => {
|
|
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
|
udpServer.close();
|
|
server.close();
|
|
process.exit(1);
|
|
});
|