Files
spore-ui/index.js

524 lines
15 KiB
JavaScript

const express = require('express');
const path = require('path');
const fs = require('fs');
const dgram = require('dgram');
const SporeApiClient = require('./src/client');
const app = express();
const PORT = process.env.PORT || 3001;
// UDP discovery configuration
const UDP_PORT = 4210;
const DISCOVERY_MESSAGE = 'CLUSTER_DISCOVERY';
// Initialize UDP server for auto discovery
const udpServer = dgram.createSocket('udp4');
// Store discovered nodes and their IPs
const discoveredNodes = new Map();
let primaryNodeIp = null;
// UDP server event handlers
udpServer.on('error', (err) => {
if (err.code === 'EADDRINUSE') {
console.error(`UDP port ${UDP_PORT} is already in use. Please check if another instance is running.`);
} else {
console.error('UDP Server error:', err);
}
udpServer.close();
});
udpServer.on('message', (msg, rinfo) => {
try {
const message = msg.toString().trim();
const sourceIp = rinfo.address;
const sourcePort = rinfo.port;
//console.log(`UDP message received from ${sourceIp}:${sourcePort}: "${message}"`);
if (message === DISCOVERY_MESSAGE) {
//console.log(`Received CLUSTER_DISCOVERY from ${sourceIp}:${sourcePort}`);
// Store the discovered node
const nodeInfo = {
ip: sourceIp,
port: sourcePort,
discoveredAt: new Date(),
lastSeen: new Date()
};
discoveredNodes.set(sourceIp, nodeInfo);
// Set as primary node if this is the first one or if we don't have one
if (!primaryNodeIp) {
primaryNodeIp = sourceIp;
console.log(`Set primary node to ${sourceIp}`);
// Immediately try to initialize the client
updateSporeClient();
}
// Update last seen timestamp
discoveredNodes.get(sourceIp).lastSeen = new Date();
//console.log(`Node ${sourceIp} added/updated. Total discovered nodes: ${discoveredNodes.size}`);
} else {
console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`);
}
} catch (error) {
console.error('Error processing UDP message:', error);
}
});
udpServer.on('listening', () => {
const address = udpServer.address();
console.log(`UDP discovery server listening on ${address.address}:${address.port}`);
});
// Bind UDP server to listen for discovery messages
udpServer.bind(UDP_PORT, () => {
console.log(`UDP discovery server bound to port ${UDP_PORT}`);
});
// Initialize the SPORE API client with dynamic IP
let sporeClient = null;
// Function to initialize or update the SporeApiClient
function initializeSporeClient(nodeIp) {
if (!nodeIp) {
console.warn('No node IP available for SporeApiClient initialization');
return null;
}
try {
const client = new SporeApiClient(`http://${nodeIp}`);
console.log(`Initialized SporeApiClient with node IP: ${nodeIp}`);
return client;
} catch (error) {
console.error(`Failed to initialize SporeApiClient with IP ${nodeIp}:`, error);
return null;
}
}
// Function to clean up stale discovered nodes (nodes not seen in the last 5 minutes)
function cleanupStaleNodes() {
const now = new Date();
const staleThreshold = 5 * 60 * 1000; // 5 minutes in milliseconds
for (const [ip, node] of discoveredNodes.entries()) {
if (now - node.lastSeen > staleThreshold) {
console.log(`Removing stale node: ${ip} (last seen: ${node.lastSeen.toISOString()})`);
discoveredNodes.delete(ip);
// If this was our primary node, clear it
if (primaryNodeIp === ip) {
primaryNodeIp = null;
console.log('Primary node became stale, clearing primary node selection');
}
}
}
}
// Function to select the best primary node
function selectBestPrimaryNode() {
if (discoveredNodes.size === 0) {
return null;
}
// If we already have a valid primary node, keep it
if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) {
return primaryNodeIp;
}
// Select the most recently seen node as primary
let bestNode = null;
let mostRecent = new Date(0);
for (const [ip, node] of discoveredNodes.entries()) {
if (node.lastSeen > mostRecent) {
mostRecent = node.lastSeen;
bestNode = ip;
}
}
if (bestNode && bestNode !== primaryNodeIp) {
primaryNodeIp = bestNode;
console.log(`Selected new primary node: ${bestNode}`);
}
return bestNode;
}
// Initialize client when a node is discovered
function updateSporeClient() {
const nodeIp = selectBestPrimaryNode();
if (nodeIp) {
sporeClient = initializeSporeClient(nodeIp);
}
}
// Set up periodic tasks
setInterval(() => {
cleanupStaleNodes();
if (!sporeClient || !primaryNodeIp || !discoveredNodes.has(primaryNodeIp)) {
updateSporeClient();
}
}, 5000); // Check every 5 seconds
// Serve static files from public directory
app.use(express.static(path.join(__dirname, 'public')));
// Serve the main HTML page
app.get('/', (req, res) => {
res.sendFile(path.join(__dirname, 'public', 'index.html'));
});
// API endpoint to get discovered nodes
app.get('/api/discovery/nodes', (req, res) => {
const nodes = Array.from(discoveredNodes.values()).map(node => ({
...node,
discoveredAt: node.discoveredAt.toISOString(),
lastSeen: node.lastSeen.toISOString(),
isPrimary: node.ip === primaryNodeIp
}));
res.json({
primaryNode: primaryNodeIp,
totalNodes: discoveredNodes.size,
nodes: nodes,
clientInitialized: !!sporeClient,
clientBaseUrl: sporeClient ? sporeClient.baseUrl : null,
discoveryStatus: {
udpPort: UDP_PORT,
discoveryMessage: DISCOVERY_MESSAGE,
serverRunning: udpServer.listening
}
});
});
// API endpoint to manually trigger discovery refresh
app.post('/api/discovery/refresh', (req, res) => {
try {
// Clean up stale nodes
cleanupStaleNodes();
// Try to update the client
updateSporeClient();
res.json({
success: true,
message: 'Discovery refresh completed',
primaryNode: primaryNodeIp,
totalNodes: discoveredNodes.size,
clientInitialized: !!sporeClient
});
} catch (error) {
console.error('Error during discovery refresh:', error);
res.status(500).json({
error: 'Discovery refresh failed',
message: error.message
});
}
});
// API endpoint to manually set primary node
app.post('/api/discovery/primary/:ip', (req, res) => {
try {
const requestedIp = req.params.ip;
if (!discoveredNodes.has(requestedIp)) {
return res.status(404).json({
error: 'Node not found',
message: `Node with IP ${requestedIp} has not been discovered`
});
}
primaryNodeIp = requestedIp;
updateSporeClient();
res.json({
success: true,
message: `Primary node set to ${requestedIp}`,
primaryNode: primaryNodeIp,
clientInitialized: !!sporeClient
});
} catch (error) {
console.error('Error setting primary node:', error);
res.status(500).json({
error: 'Failed to set primary node',
message: error.message
});
}
});
// API endpoint to get cluster members
app.get('/api/cluster/members', async (req, res) => {
try {
if (!sporeClient) {
return res.status(503).json({
error: 'Service unavailable',
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
discoveredNodes: Array.from(discoveredNodes.keys())
});
}
const members = await sporeClient.getClusterStatus();
res.json(members);
} catch (error) {
console.error('Error fetching cluster members:', error);
res.status(500).json({
error: 'Failed to fetch cluster members',
message: error.message
});
}
});
// API endpoint to get task status
app.get('/api/tasks/status', async (req, res) => {
try {
if (!sporeClient) {
return res.status(503).json({
error: 'Service unavailable',
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
discoveredNodes: Array.from(discoveredNodes.keys())
});
}
const taskStatus = await sporeClient.getTaskStatus();
res.json(taskStatus);
} catch (error) {
console.error('Error fetching task status:', error);
res.status(500).json({
error: 'Failed to fetch task status',
message: error.message
});
}
});
// API endpoint to get system status
app.get('/api/node/status', async (req, res) => {
try {
if (!sporeClient) {
return res.status(503).json({
error: 'Service unavailable',
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
discoveredNodes: Array.from(discoveredNodes.keys())
});
}
const systemStatus = await sporeClient.getSystemStatus();
res.json(systemStatus);
} catch (error) {
console.error('Error fetching system status:', error);
res.status(500).json({
error: 'Failed to fetch system status',
message: error.message
});
}
});
// Proxy endpoint to get status from a specific node
app.get('/api/node/status/:ip', async (req, res) => {
try {
const nodeIp = req.params.ip;
// Create a temporary client for the specific node
const nodeClient = new SporeApiClient(`http://${nodeIp}`);
const nodeStatus = await nodeClient.getSystemStatus();
res.json(nodeStatus);
} catch (error) {
console.error(`Error fetching status from node ${req.params.ip}:`, error);
res.status(500).json({
error: `Failed to fetch status from node ${req.params.ip}`,
message: error.message
});
}
});
// File upload endpoint for firmware updates
app.post('/api/node/update', express.raw({ type: 'multipart/form-data', limit: '50mb' }), async (req, res) => {
try {
const nodeIp = req.query.ip || req.headers['x-node-ip'];
if (!nodeIp) {
return res.status(400).json({
error: 'Node IP address is required',
message: 'Please provide the target node IP address'
});
}
// Parse multipart form data manually
const boundary = req.headers['content-type']?.split('boundary=')[1];
if (!boundary) {
return res.status(400).json({
error: 'Invalid content type',
message: 'Expected multipart/form-data with boundary'
});
}
// Parse the multipart data to extract the file
const fileData = parseMultipartData(req.body, boundary);
if (!fileData.file) {
return res.status(400).json({
error: 'No file data received',
message: 'Please select a firmware file to upload'
});
}
console.log(`Uploading firmware to node ${nodeIp}, file size: ${fileData.file.data.length} bytes, filename: ${fileData.file.filename}`);
// Create a temporary client for the specific node
const nodeClient = new SporeApiClient(`http://${nodeIp}`);
// Send the firmware data to the node using multipart form data
const updateResult = await nodeClient.updateFirmware(fileData.file.data, fileData.file.filename);
res.json({
success: true,
message: 'Firmware uploaded successfully',
nodeIp: nodeIp,
fileSize: fileData.file.data.length,
filename: fileData.file.filename,
result: updateResult
});
} catch (error) {
console.error('Error uploading firmware:', error);
res.status(500).json({
error: 'Failed to upload firmware',
message: error.message
});
}
});
// Health check endpoint
app.get('/api/health', (req, res) => {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
services: {
http: true,
udp: udpServer.listening,
sporeClient: !!sporeClient
},
discovery: {
totalNodes: discoveredNodes.size,
primaryNode: primaryNodeIp,
udpPort: UDP_PORT,
serverRunning: udpServer.listening
}
};
// If no nodes discovered, mark as degraded
if (discoveredNodes.size === 0) {
health.status = 'degraded';
health.message = 'No SPORE nodes discovered yet';
}
// If no client initialized, mark as degraded
if (!sporeClient) {
health.status = 'degraded';
health.message = health.message ?
`${health.message}; SPORE client not initialized` :
'SPORE client not initialized';
}
const statusCode = health.status === 'healthy' ? 200 : 503;
res.status(statusCode).json(health);
});
// Helper function to parse multipart form data
function parseMultipartData(buffer, boundary) {
const data = {};
const boundaryBuffer = Buffer.from(`--${boundary}`);
const endBoundaryBuffer = Buffer.from(`--${boundary}--`);
let start = 0;
let end = 0;
while (true) {
// Find the start of the next part
start = buffer.indexOf(boundaryBuffer, end);
if (start === -1) break;
// Find the end of this part
end = buffer.indexOf(boundaryBuffer, start + boundaryBuffer.length);
if (end === -1) {
end = buffer.indexOf(endBoundaryBuffer, start + boundaryBuffer.length);
if (end === -1) break;
}
// Extract the part content
const partBuffer = buffer.slice(start + boundaryBuffer.length + 2, end);
// Parse headers and content
const headerEnd = partBuffer.indexOf('\r\n\r\n');
if (headerEnd === -1) continue;
const headers = partBuffer.slice(0, headerEnd).toString();
const content = partBuffer.slice(headerEnd + 4);
// Parse the Content-Disposition header to get field name and filename
const nameMatch = headers.match(/name="([^"]+)"/);
const filenameMatch = headers.match(/filename="([^"]+)"/);
if (nameMatch) {
const fieldName = nameMatch[1];
const filename = filenameMatch ? filenameMatch[1] : null;
data[fieldName] = {
data: content,
filename: filename
};
}
}
return data;
}
// Start the server
const server = app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
console.log(`UDP discovery server listening on port ${UDP_PORT}`);
console.log('Waiting for CLUSTER_DISCOVERY messages from SPORE nodes...');
});
// Graceful shutdown handling
process.on('SIGINT', () => {
console.log('\nReceived SIGINT. Shutting down gracefully...');
udpServer.close(() => {
console.log('UDP server closed.');
});
server.close(() => {
console.log('HTTP server closed.');
process.exit(0);
});
});
process.on('SIGTERM', () => {
console.log('\nReceived SIGTERM. Shutting down gracefully...');
udpServer.close(() => {
console.log('UDP server closed.');
});
server.close(() => {
console.log('HTTP server closed.');
process.exit(0);
});
});
// Handle uncaught exceptions
process.on('uncaughtException', (err) => {
console.error('Uncaught Exception:', err);
udpServer.close();
server.close();
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
udpServer.close();
server.close();
process.exit(1);
});