Files
spore-ui/index.js

1164 lines
36 KiB
JavaScript

const express = require('express');
const path = require('path');
const fs = require('fs');
const dgram = require('dgram');
const SporeApiClient = require('./src/client');
const cors = require('cors');
const WebSocket = require('ws');
// Simple logging utility with level control
const logger = {
debug: (...args) => {
if (process.env.LOG_LEVEL === 'debug' || process.env.NODE_ENV === 'development') {
console.log('[DEBUG]', ...args);
}
},
info: (...args) => console.log('[INFO]', ...args),
warn: (...args) => console.warn('[WARN]', ...args),
error: (...args) => console.error('[ERROR]', ...args)
};
const app = express();
const PORT = process.env.PORT || 3001;
// Middleware
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
// File upload middleware
const fileUpload = require('express-fileupload');
app.use(fileUpload({
limits: { fileSize: 50 * 1024 * 1024 }, // 50MB limit
abortOnLimit: true,
responseOnLimit: 'File size limit has been reached',
debug: false
}));
// Add CORS middleware
app.use(cors({
origin: '*', // Or specify your phone's IP range like: ['http://192.168.1.0/24']
methods: ['GET', 'POST', 'PUT', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization']
}));
// UDP heartbeat-only configuration
const UDP_PORT = 4210;
const STALE_THRESHOLD_SECONDS = 8; // 8 seconds to accommodate 5-second heartbeat interval
// Initialize UDP server for heartbeat-based cluster management
const udpServer = dgram.createSocket('udp4');
// Store discovered nodes and their IPs
const discoveredNodes = new Map();
let primaryNodeIp = null;
// UDP server event handlers
udpServer.on('error', (err) => {
if (err.code === 'EADDRINUSE') {
console.error(`UDP port ${UDP_PORT} is already in use. Please check if another instance is running.`);
} else {
console.error('UDP Server error:', err);
}
udpServer.close();
});
udpServer.on('message', (msg, rinfo) => {
try {
const message = msg.toString().trim();
const sourceIp = rinfo.address;
const sourcePort = rinfo.port;
console.log(`📨 UDP message received from ${sourceIp}:${sourcePort}: "${message}"`);
if (message.startsWith('CLUSTER_HEARTBEAT:')) {
// Handle heartbeat messages that update member list
const hostname = message.substring('CLUSTER_HEARTBEAT:'.length);
updateNodeFromHeartbeat(sourceIp, sourcePort, hostname);
} else if (message.startsWith('NODE_UPDATE:')) {
// Handle node update messages that provide detailed node info
handleNodeUpdate(sourceIp, message);
} else if (!message.startsWith('RAW:')) {
console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`);
}
} catch (error) {
console.error('Error processing UDP message:', error);
}
});
udpServer.on('listening', () => {
const address = udpServer.address();
console.log(`UDP heartbeat server listening on ${address.address}:${address.port}`);
});
// Bind UDP server to listen for heartbeat messages
udpServer.bind(UDP_PORT, () => {
console.log(`UDP heartbeat server bound to port ${UDP_PORT}`);
});
// Initialize the SPORE API client with dynamic IP
let sporeClient = null;
// Function to initialize or update the SporeApiClient
function initializeSporeClient(nodeIp) {
if (!nodeIp) {
console.warn('No node IP available for SporeApiClient initialization');
return null;
}
try {
const client = new SporeApiClient(`http://${nodeIp}`);
console.log(`Initialized SporeApiClient with node IP: ${nodeIp}`);
return client;
} catch (error) {
console.error(`Failed to initialize SporeApiClient with IP ${nodeIp}:`, error);
return null;
}
}
// Function to mark stale nodes as inactive (instead of removing them)
function markStaleNodes() {
const now = Math.floor(Date.now() / 1000);
let nodesMarkedStale = false;
for (const [ip, node] of discoveredNodes.entries()) {
const timeSinceLastSeen = now - node.lastSeen;
if (timeSinceLastSeen > STALE_THRESHOLD_SECONDS && node.status !== 'inactive') {
console.log(`💀 NODE MARKED INACTIVE: ${ip} (${node.hostname || 'Unknown'}) - last seen ${timeSinceLastSeen}s ago (threshold: ${STALE_THRESHOLD_SECONDS}s)`);
node.status = 'inactive';
nodesMarkedStale = true;
// Broadcast stale node event immediately
logger.debug(`📡 Broadcasting stale node event for ${ip}`);
broadcastNodeDiscovery(ip, 'stale');
// If this was our primary node, clear it and select a new one
if (primaryNodeIp === ip) {
primaryNodeIp = null;
console.log('🚫 PRIMARY NODE BECAME STALE: Clearing primary node selection');
// Automatically select a new primary node from remaining healthy nodes
const newPrimary = selectBestPrimaryNode();
if (newPrimary) {
console.log(`✅ NEW PRIMARY NODE SELECTED: ${newPrimary} (auto-selected after stale cleanup)`);
// Update the SPORE client to use the new primary node
updateSporeClient();
} else {
console.log('⚠️ No healthy nodes available for primary selection');
}
}
}
}
// Broadcast cluster update if any nodes were marked stale
if (nodesMarkedStale) {
broadcastMemberListChange('nodes marked stale');
}
}
// Function to select the best primary node
function selectBestPrimaryNode() {
if (discoveredNodes.size === 0) {
return null;
}
// If we already have a valid primary node, keep it
if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) {
return primaryNodeIp;
}
// Select the most recently seen node as primary
let bestNode = null;
let mostRecent = new Date(0);
for (const [ip, node] of discoveredNodes.entries()) {
if (node.lastSeen > mostRecent) {
mostRecent = node.lastSeen;
bestNode = ip;
}
}
if (bestNode && bestNode !== primaryNodeIp) {
primaryNodeIp = bestNode;
console.log(`Selected new primary node: ${bestNode}`);
broadcastMemberListChange('primary node change');
}
return bestNode;
}
// Function to randomly select a primary node
function selectRandomPrimaryNode() {
if (discoveredNodes.size === 0) {
return null;
}
// Convert discovered nodes to array and filter out current primary
const availableNodes = Array.from(discoveredNodes.keys()).filter(ip => ip !== primaryNodeIp);
if (availableNodes.length === 0) {
// If no other nodes available, keep current primary
return primaryNodeIp;
}
// Randomly select from available nodes
const randomIndex = Math.floor(Math.random() * availableNodes.length);
const randomNode = availableNodes[randomIndex];
// Update primary node
primaryNodeIp = randomNode;
console.log(`Randomly selected new primary node: ${randomNode}`);
broadcastMemberListChange('random primary node selection');
return randomNode;
}
// Initialize client when a node is discovered
function updateSporeClient() {
const nodeIp = selectBestPrimaryNode();
if (nodeIp) {
sporeClient = initializeSporeClient(nodeIp);
}
}
// Helper: perform an operation against the current primary, failing over to other discovered nodes if needed
async function performWithFailover(operation) {
// Build candidate list: current primary first, then others by most recently seen
const candidateIps = [];
if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) {
candidateIps.push(primaryNodeIp);
}
const others = Array.from(discoveredNodes.values())
.filter(n => n.ip !== primaryNodeIp)
.sort((a, b) => b.lastSeen - a.lastSeen)
.map(n => n.ip);
candidateIps.push(...others);
if (candidateIps.length === 0) {
throw new Error('No SPORE nodes discovered');
}
let lastError = null;
for (const ip of candidateIps) {
try {
const client = (sporeClient && ip === primaryNodeIp)
? sporeClient
: initializeSporeClient(ip);
if (!client) {
throw new Error(`Failed to initialize client for ${ip}`);
}
const result = await operation(client, ip);
if (ip !== primaryNodeIp) {
primaryNodeIp = ip;
sporeClient = client;
logger.info(`Failover: switched primary node to ${ip}`);
broadcastMemberListChange('failover primary node switch');
}
return result;
} catch (err) {
console.warn(`Primary attempt on ${ip} failed: ${err.message}`);
lastError = err;
continue;
}
}
throw lastError || new Error('All discovered nodes failed');
}
// Function to update node from heartbeat message
function updateNodeFromHeartbeat(sourceIp, sourcePort, hostname) {
const existingNode = discoveredNodes.get(sourceIp);
const now = Math.floor(Date.now() / 1000);
if (existingNode) {
// Update existing node
const wasStale = existingNode.status === 'inactive';
const oldHostname = existingNode.hostname;
existingNode.lastSeen = now;
existingNode.hostname = hostname;
existingNode.status = 'active'; // Mark as active when heartbeat received
logger.debug(`💓 Heartbeat from ${sourceIp}:${sourcePort} (${hostname}). Total nodes: ${discoveredNodes.size}`);
// Check if hostname changed
const hostnameChanged = oldHostname !== hostname;
if (hostnameChanged) {
console.log(`🔄 Hostname updated for ${sourceIp}: "${oldHostname}" -> "${hostname}"`);
}
// Broadcast heartbeat update for immediate UI updates
const reason = wasStale ? 'node became active' :
hostnameChanged ? 'hostname update' :
'active heartbeat';
logger.debug(`📡 Broadcasting heartbeat update: ${reason}`);
broadcastMemberListChange(reason);
} else {
// Create new node entry from heartbeat - NEW NODE DISCOVERED
const nodeInfo = {
ip: sourceIp,
port: sourcePort,
hostname: hostname,
status: 'active',
discoveredAt: now,
lastSeen: now
};
discoveredNodes.set(sourceIp, nodeInfo);
console.log(`🆕 NEW NODE DISCOVERED: ${sourceIp}:${sourcePort} (${hostname}) via heartbeat. Total nodes: ${discoveredNodes.size}`);
// Set as primary node if this is the first one or if we don't have one
if (!primaryNodeIp) {
primaryNodeIp = sourceIp;
console.log(`Set primary node to ${sourceIp} (from heartbeat)`);
updateSporeClient();
}
// Broadcast discovery event for new node from heartbeat
broadcastNodeDiscovery(sourceIp, 'discovered');
// Broadcast cluster update after a short delay to allow member data to be fetched
setTimeout(() => broadcastMemberListChange('new heartbeat discovery'), 1000);
}
}
// Function to handle NODE_UPDATE messages
function handleNodeUpdate(sourceIp, message) {
// Message format: "NODE_UPDATE:hostname:{json}"
const parts = message.split(':');
if (parts.length < 3) {
console.warn(`Invalid NODE_UPDATE message format: ${message}`);
return;
}
const hostname = parts[1];
const jsonData = parts.slice(2).join(':');
try {
const nodeData = JSON.parse(jsonData);
// Update the specific node with the new information
const existingNode = discoveredNodes.get(sourceIp);
if (existingNode) {
// Update hostname if provided
if (nodeData.hostname) {
existingNode.hostname = nodeData.hostname;
}
// Update uptime if provided
if (nodeData.uptime) {
existingNode.uptime = nodeData.uptime;
}
// Update labels if provided
if (nodeData.labels) {
existingNode.labels = nodeData.labels;
}
existingNode.lastSeen = Math.floor(Date.now() / 1000);
existingNode.status = 'active';
console.log(`🔄 Updated node ${sourceIp} (${hostname}) from NODE_UPDATE`);
broadcastMemberListChange('node update');
} else {
console.warn(`Received NODE_UPDATE for unknown node: ${sourceIp} (${hostname})`);
}
} catch (error) {
console.error(`Error parsing NODE_UPDATE JSON: ${error.message}`);
}
}
// Set up periodic tasks
setInterval(() => {
markStaleNodes();
if (!sporeClient || !primaryNodeIp || !discoveredNodes.has(primaryNodeIp)) {
updateSporeClient();
}
}, 2000); // Check every 2 seconds for faster stale detection
// Serve static files from public directory
app.use(express.static(path.join(__dirname, 'public')));
// Serve the main HTML page
app.get('/', (req, res) => {
res.sendFile(path.join(__dirname, 'public', 'index.html'));
});
// API endpoint to get cluster nodes (heartbeat-based)
app.get('/api/discovery/nodes', (req, res) => {
const nodes = Array.from(discoveredNodes.values()).map(node => ({
...node,
discoveredAt: new Date(node.discoveredAt * 1000).toISOString(),
lastSeen: new Date(node.lastSeen * 1000).toISOString(),
isPrimary: node.ip === primaryNodeIp
}));
res.json({
primaryNode: primaryNodeIp,
totalNodes: discoveredNodes.size,
nodes: nodes,
clientInitialized: !!sporeClient,
clientBaseUrl: sporeClient ? sporeClient.baseUrl : null,
clusterStatus: {
udpPort: UDP_PORT,
serverRunning: udpServer.listening
}
});
});
// API endpoint to manually trigger cluster refresh
app.post('/api/discovery/refresh', (req, res) => {
try {
// Mark stale nodes as inactive
markStaleNodes();
// Try to update the client
updateSporeClient();
// Broadcast cluster update via WebSocket
broadcastMemberListChange('manual refresh');
res.json({
success: true,
message: 'Cluster refresh completed',
primaryNode: primaryNodeIp,
totalNodes: discoveredNodes.size,
clientInitialized: !!sporeClient
});
} catch (error) {
console.error('Error during cluster refresh:', error);
res.status(500).json({
error: 'Cluster refresh failed',
message: error.message
});
}
});
// API endpoint to test WebSocket broadcasting
app.post('/api/test/websocket', (req, res) => {
try {
console.log('🧪 Manual WebSocket test triggered');
broadcastMemberListChange('manual test');
res.json({
success: true,
message: 'WebSocket test broadcast sent',
websocketClients: wsClients.size,
totalNodes: discoveredNodes.size
});
} catch (error) {
console.error('Error during WebSocket test:', error);
res.status(500).json({
error: 'WebSocket test failed',
message: error.message
});
}
});
// API endpoint to randomly select a new primary node
app.post('/api/discovery/random-primary', (req, res) => {
try {
if (discoveredNodes.size === 0) {
return res.status(404).json({
error: 'No nodes available',
message: 'No SPORE nodes have been discovered yet'
});
}
// Randomly select a new primary node
const randomNode = selectRandomPrimaryNode();
if (!randomNode) {
return res.status(500).json({
error: 'Selection failed',
message: 'Failed to select a random primary node'
});
}
// Update the client with the new primary node
updateSporeClient();
// Get current timestamp for the response
const timestamp = req.body && req.body.timestamp ? req.body.timestamp : new Date().toISOString();
res.json({
success: true,
message: `Randomly selected new primary node: ${randomNode}`,
primaryNode: primaryNodeIp,
totalNodes: discoveredNodes.size,
clientInitialized: !!sporeClient,
timestamp: timestamp
});
} catch (error) {
console.error('Error selecting random primary node:', error);
res.status(500).json({
error: 'Random selection failed',
message: error.message
});
}
});
// API endpoint to manually set primary node
app.post('/api/discovery/primary/:ip', (req, res) => {
try {
const requestedIp = req.params.ip;
if (!discoveredNodes.has(requestedIp)) {
return res.status(404).json({
error: 'Node not found',
message: `Node with IP ${requestedIp} has not been discovered`
});
}
primaryNodeIp = requestedIp;
updateSporeClient();
broadcastMemberListChange('manual primary node setting');
res.json({
success: true,
message: `Primary node set to ${requestedIp}`,
primaryNode: primaryNodeIp,
clientInitialized: !!sporeClient
});
} catch (error) {
console.error('Error setting primary node:', error);
res.status(500).json({
error: 'Failed to set primary node',
message: error.message
});
}
});
// API endpoint to get cluster members
app.get('/api/cluster/members', async (req, res) => {
try {
if (discoveredNodes.size === 0) {
return res.status(503).json({
error: 'Service unavailable',
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...',
discoveredNodes: Array.from(discoveredNodes.keys())
});
}
const members = await performWithFailover((client) => client.getClusterStatus());
res.json(members);
} catch (error) {
console.error('Error fetching cluster members:', error);
res.status(502).json({
error: 'Failed to fetch cluster members',
message: error.message
});
}
});
// API endpoint to get task status
app.get('/api/tasks/status', async (req, res) => {
try {
const { ip } = req.query;
if (ip) {
try {
const nodeClient = new SporeApiClient(`http://${ip}`);
const taskStatus = await nodeClient.getTaskStatus();
return res.json(taskStatus);
} catch (innerError) {
console.error('Error fetching task status from specific node:', innerError);
return res.status(500).json({
error: 'Failed to fetch task status from node',
message: innerError.message
});
}
}
if (discoveredNodes.size === 0) {
return res.status(503).json({
error: 'Service unavailable',
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...',
discoveredNodes: Array.from(discoveredNodes.keys())
});
}
const taskStatus = await performWithFailover((client) => client.getTaskStatus());
res.json(taskStatus);
} catch (error) {
console.error('Error fetching task status:', error);
res.status(502).json({
error: 'Failed to fetch task status',
message: error.message
});
}
});
// API endpoint to get system status
app.get('/api/node/status', async (req, res) => {
try {
if (discoveredNodes.size === 0) {
return res.status(503).json({
error: 'Service unavailable',
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
discoveredNodes: Array.from(discoveredNodes.keys())
});
}
const systemStatus = await performWithFailover((client) => client.getSystemStatus());
res.json(systemStatus);
} catch (error) {
console.error('Error fetching system status:', error);
res.status(502).json({
error: 'Failed to fetch system status',
message: error.message
});
}
});
// Proxy endpoint to get node capabilities (optionally for a specific node via ?ip=)
app.get('/api/node/endpoints', async (req, res) => {
try {
const { ip } = req.query;
if (ip) {
try {
const nodeClient = new SporeApiClient(`http://${ip}`);
const caps = await nodeClient.getCapabilities();
return res.json(caps);
} catch (innerError) {
console.error('Error fetching endpoints from specific node:', innerError);
return res.status(500).json({
error: 'Failed to fetch endpoints from node',
message: innerError.message
});
}
}
if (discoveredNodes.size === 0) {
return res.status(503).json({
error: 'Service unavailable',
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_HEARTBEAT messages...',
discoveredNodes: Array.from(discoveredNodes.keys())
});
}
const caps = await performWithFailover((client) => client.getCapabilities());
return res.json(caps);
} catch (error) {
console.error('Error fetching capabilities:', error);
return res.status(502).json({
error: 'Failed to fetch capabilities',
message: error.message
});
}
});
// Generic proxy to call a node capability directly
app.post('/api/proxy-call', async (req, res) => {
try {
const { ip, method, uri, params } = req.body || {};
if (!ip || !method || !uri) {
return res.status(400).json({
error: 'Missing required fields',
message: 'Required: ip, method, uri'
});
}
// Build target URL
let targetPath = uri;
let queryParams = new URLSearchParams();
let bodyParams = new URLSearchParams();
if (Array.isArray(params)) {
for (const p of params) {
const name = p?.name;
const value = p?.value ?? '';
const location = (p?.location || 'body').toLowerCase();
if (!name) continue;
if (location === 'query') {
queryParams.append(name, String(value));
} else if (location === 'path') {
// Replace {name} or :name in path
targetPath = targetPath.replace(new RegExp(`[{:]${name}[}]?`, 'g'), encodeURIComponent(String(value)));
} else {
// Default to body
bodyParams.append(name, String(value));
}
}
}
const queryString = queryParams.toString();
const fullUrl = `http://${ip}${targetPath}${queryString ? `?${queryString}` : ''}`;
// Prepare fetch options
const upperMethod = String(method).toUpperCase();
const fetchOptions = { method: upperMethod, headers: {} };
if (upperMethod !== 'GET') {
// Default to form-encoded body for generic proxy
fetchOptions.headers['Content-Type'] = 'application/x-www-form-urlencoded';
fetchOptions.body = bodyParams.toString();
}
// Debug logging to trace upstream requests
try {
logger.debug('[proxy-call] →', upperMethod, fullUrl);
if (upperMethod !== 'GET') {
logger.debug('[proxy-call] body:', fetchOptions.body);
}
} catch (_) {
// ignore logging errors
}
// Execute request
const response = await fetch(fullUrl, fetchOptions);
const respContentType = response.headers.get('content-type') || '';
let data;
if (respContentType.includes('application/json')) {
data = await response.json();
} else {
data = await response.text();
}
if (!response.ok) {
// Surface upstream failure details for easier debugging
console.warn('[proxy-call] Upstream error', response.status, response.statusText, 'for', upperMethod, fullUrl);
return res.status(response.status).json({
error: 'Upstream request failed',
status: response.status,
statusText: response.statusText,
data
});
}
return res.json({ success: true, data });
} catch (error) {
console.error('Error in /api/proxy-call:', error);
return res.status(500).json({
error: 'Proxy call failed',
message: error.message
});
}
});
// Proxy endpoint to get status from a specific node
app.get('/api/node/status/:ip', async (req, res) => {
try {
const nodeIp = req.params.ip;
// Create a temporary client for the specific node
const nodeClient = new SporeApiClient(`http://${nodeIp}`);
const nodeStatus = await nodeClient.getSystemStatus();
res.json(nodeStatus);
} catch (error) {
console.error(`Error fetching status from node ${req.params.ip}:`, error);
res.status(500).json({
error: `Failed to fetch status from node ${req.params.ip}`,
message: error.message
});
}
});
// Endpoint to trigger a cluster refresh
app.post('/api/cluster/refresh', async (req, res) => {
try {
const { reason } = req.body || {};
console.log(`🔄 Manual cluster refresh triggered: ${reason || 'unknown reason'}`);
console.log(`📡 WebSocket clients connected: ${wsClients.size}`);
// Trigger a cluster update broadcast
broadcastMemberListChange(reason || 'manual_refresh');
res.json({
success: true,
message: 'Cluster refresh triggered',
reason: reason || 'manual_refresh',
wsClients: wsClients.size
});
} catch (error) {
console.error('Error triggering cluster refresh:', error);
res.status(500).json({
error: 'Failed to trigger cluster refresh',
message: error.message
});
}
});
// File upload endpoint for firmware updates
app.post('/api/node/update', async (req, res) => {
try {
const nodeIp = req.query.ip || req.headers['x-node-ip'];
if (!nodeIp) {
return res.status(400).json({
error: 'Node IP address is required',
message: 'Please provide the target node IP address'
});
}
// Check if we have a file in the request
if (!req.files || !req.files.file) {
console.log('File upload request received but no file found:', {
hasFiles: !!req.files,
fileKeys: req.files ? Object.keys(req.files) : [],
contentType: req.headers['content-type']
});
return res.status(400).json({
error: 'No file data received',
message: 'Please select a firmware file to upload'
});
}
const uploadedFile = req.files.file;
console.log(`File upload received:`, {
nodeIp: nodeIp,
filename: uploadedFile.name,
fileSize: uploadedFile.data.length,
mimetype: uploadedFile.mimetype,
encoding: uploadedFile.encoding
});
// Create a temporary client for the specific node
const nodeClient = new SporeApiClient(`http://${nodeIp}`);
console.log(`Created SPORE client for node ${nodeIp}`);
// Send the firmware data to the node
console.log(`Starting firmware upload to SPORE device ${nodeIp}...`);
try {
const updateResult = await nodeClient.updateFirmware(uploadedFile.data, uploadedFile.name);
console.log(`Firmware upload to SPORE device ${nodeIp} completed:`, updateResult);
// Check if the SPORE device reported a failure
if (updateResult && updateResult.status === 'FAIL') {
console.error(`SPORE device ${nodeIp} reported firmware update failure:`, updateResult.message);
return res.status(400).json({
success: false,
error: 'Firmware update failed',
message: updateResult.message || 'Firmware update failed on device',
nodeIp: nodeIp,
fileSize: uploadedFile.data.length,
filename: uploadedFile.name,
result: updateResult
});
}
res.json({
success: true,
message: 'Firmware uploaded successfully',
nodeIp: nodeIp,
fileSize: uploadedFile.data.length,
filename: uploadedFile.name,
result: updateResult
});
} catch (uploadError) {
console.error(`Firmware upload to SPORE device ${nodeIp} failed:`, uploadError);
throw new Error(`SPORE device upload failed: ${uploadError.message}`);
}
} catch (error) {
console.error('Error uploading firmware:', error);
res.status(500).json({
error: 'Failed to upload firmware',
message: error.message
});
}
});
// Health check endpoint
app.get('/api/health', (req, res) => {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
services: {
http: true,
udp: udpServer.listening,
sporeClient: !!sporeClient
},
cluster: {
totalNodes: discoveredNodes.size,
primaryNode: primaryNodeIp,
udpPort: UDP_PORT,
serverRunning: udpServer.listening
}
};
// If no nodes discovered, mark as degraded
if (discoveredNodes.size === 0) {
health.status = 'degraded';
health.message = 'No SPORE nodes discovered yet';
}
// If no client initialized, mark as degraded
if (!sporeClient) {
health.status = 'degraded';
health.message = health.message ?
`${health.message}; SPORE client not initialized` :
'SPORE client not initialized';
}
const statusCode = health.status === 'healthy' ? 200 : 503;
res.status(statusCode).json(health);
});
// WebSocket server setup - will be initialized after HTTP server
let wss = null;
const wsClients = new Set();
// Function to broadcast cluster updates to all connected WebSocket clients
function broadcastClusterUpdate() {
if (wsClients.size === 0 || !wss) return;
const startTime = Date.now();
logger.debug(`📡 [${new Date().toISOString()}] Starting cluster update broadcast to ${wsClients.size} clients`);
// Get cluster members asynchronously
getCurrentClusterMembers().then(members => {
const clusterData = {
type: 'cluster_update',
members: members,
primaryNode: primaryNodeIp,
totalNodes: discoveredNodes.size,
timestamp: new Date().toISOString()
};
const message = JSON.stringify(clusterData);
const broadcastTime = Date.now() - startTime;
logger.debug(`📡 [${new Date().toISOString()}] Broadcasting cluster update to ${wsClients.size} WebSocket clients (took ${broadcastTime}ms)`);
logger.debug(`📊 Cluster data: ${members.length} members, primary: ${primaryNodeIp || 'none'}`);
wsClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}).catch(error => {
console.error('Error broadcasting cluster update:', error);
});
}
// Function to broadcast node discovery events
function broadcastNodeDiscovery(nodeIp, action) {
if (wsClients.size === 0 || !wss) return;
const eventData = {
type: 'node_discovery',
action: action, // 'discovered' or 'stale'
nodeIp: nodeIp,
timestamp: new Date().toISOString()
};
const message = JSON.stringify(eventData);
wsClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
// Helper function to broadcast member list changes
function broadcastMemberListChange(reason = 'update') {
const timestamp = new Date().toISOString();
logger.debug(`🔄 [${timestamp}] Member list changed (${reason}), broadcasting update`);
broadcastClusterUpdate();
}
// Helper function to get current cluster members (async version)
async function getCurrentClusterMembers() {
try {
if (discoveredNodes.size === 0) {
return [];
}
// Fetch real cluster data from SPORE nodes for accurate information
logger.debug(`📡 Fetching real cluster data from ${discoveredNodes.size} nodes for WebSocket broadcast`);
const clusterResponse = await performWithFailover((client) => client.getClusterStatus());
const apiMembers = clusterResponse.members || [];
// Debug: Log the labels from the API response
apiMembers.forEach(member => {
if (member.labels && Object.keys(member.labels).length > 0) {
logger.debug(`🏷️ API member ${member.ip} labels:`, member.labels);
}
});
// Update our local discoveredNodes with fresh information from the API
let updatedNodes = false;
apiMembers.forEach(apiMember => {
const localNode = discoveredNodes.get(apiMember.ip);
if (localNode) {
// Update local node with fresh API data
const needsUpdate =
localNode.hostname !== apiMember.hostname ||
localNode.status !== apiMember.status ||
localNode.latency !== apiMember.latency ||
JSON.stringify(localNode.labels) !== JSON.stringify(apiMember.labels);
if (needsUpdate) {
logger.debug(`🔄 Updating local node ${apiMember.ip} with fresh API data`);
localNode.hostname = apiMember.hostname;
localNode.status = apiMember.status;
localNode.latency = apiMember.latency;
localNode.labels = apiMember.labels || {};
localNode.lastSeen = Math.floor(Date.now() / 1000);
updatedNodes = true;
}
} else {
// New node discovered via API - shouldn't happen but handle it
logger.debug(`🆕 New node discovered via API: ${apiMember.ip}`);
discoveredNodes.set(apiMember.ip, {
ip: apiMember.ip,
hostname: apiMember.hostname,
status: apiMember.status,
latency: apiMember.latency,
labels: apiMember.labels || {},
discoveredAt: new Date(),
lastSeen: Math.floor(Date.now() / 1000)
});
updatedNodes = true;
}
});
// If we updated any nodes, broadcast the changes
if (updatedNodes) {
logger.debug(`📡 Local node data updated, triggering immediate broadcast`);
// Note: We don't call broadcastMemberListChange here because we're already in the middle of a broadcast
// The calling function will handle the broadcast
}
// Enhance API data with our local status information
const enhancedMembers = apiMembers.map(apiMember => {
const localNode = discoveredNodes.get(apiMember.ip);
if (localNode) {
// Use our local status (which may be 'inactive' if the node became stale)
return {
...apiMember,
status: localNode.status || apiMember.status,
hostname: localNode.hostname || apiMember.hostname,
lastSeen: localNode.lastSeen || apiMember.lastSeen,
labels: localNode.labels || apiMember.labels || {},
resources: localNode.resources || apiMember.resources || {}
};
}
return apiMember;
});
logger.debug(`📊 Returning ${enhancedMembers.length} enhanced cluster members via WebSocket`);
return enhancedMembers;
} catch (error) {
console.error('Error getting cluster members for WebSocket:', error);
// Fallback to local data if API fails
logger.debug('⚠️ API failed, falling back to local discoveredNodes data');
const fallbackMembers = Array.from(discoveredNodes.values()).map(node => ({
ip: node.ip,
hostname: node.hostname || 'Unknown Device',
status: node.status || 'active', // Use stored status (may be 'inactive')
latency: node.latency || 0,
lastSeen: node.lastSeen || Math.floor(Date.now() / 1000),
labels: node.labels || {},
resources: node.resources || {}
}));
logger.debug(`📊 Fallback: Returning ${fallbackMembers.length} local cluster members`);
return fallbackMembers;
}
}
// Initialize WebSocket server after HTTP server is created
function initializeWebSocketServer(httpServer) {
wss = new WebSocket.Server({ server: httpServer });
// WebSocket connection handler
wss.on('connection', (ws) => {
logger.debug('WebSocket client connected');
wsClients.add(ws);
// Send current cluster state to newly connected client
if (discoveredNodes.size > 0) {
// Get cluster members asynchronously without blocking
getCurrentClusterMembers().then(members => {
const clusterData = {
type: 'cluster_update',
members: members,
primaryNode: primaryNodeIp,
totalNodes: discoveredNodes.size,
timestamp: new Date().toISOString()
};
logger.debug(`🔌 Sending initial cluster state to new WebSocket client: ${members.length} members`);
ws.send(JSON.stringify(clusterData));
}).catch(error => {
console.error('Error sending initial cluster state:', error);
});
}
// Handle client disconnection
ws.on('close', () => {
logger.debug('WebSocket client disconnected');
wsClients.delete(ws);
});
// Handle WebSocket errors
ws.on('error', (error) => {
console.error('WebSocket error:', error);
wsClients.delete(ws);
});
});
console.log('WebSocket server initialized');
}
// Start the server
const server = app.listen(PORT, '0.0.0.0', () => {
console.log(`Server is running on http://0.0.0.0:${PORT}`);
console.log(`Accessible from: http://YOUR_COMPUTER_IP:${PORT}`);
console.log(`UDP heartbeat server listening on port ${UDP_PORT}`);
// Initialize WebSocket server after HTTP server is running
initializeWebSocketServer(server);
console.log('WebSocket server ready for real-time updates');
console.log('Waiting for CLUSTER_HEARTBEAT and NODE_UPDATE messages from SPORE nodes...');
});
// Graceful shutdown handling
process.on('SIGINT', () => {
console.log('\nReceived SIGINT. Shutting down gracefully...');
udpServer.close(() => {
console.log('UDP heartbeat server closed.');
});
server.close(() => {
console.log('HTTP server closed.');
process.exit(0);
});
});
process.on('SIGTERM', () => {
console.log('\nReceived SIGTERM. Shutting down gracefully...');
udpServer.close(() => {
console.log('UDP heartbeat server closed.');
});
server.close(() => {
console.log('HTTP server closed.');
process.exit(0);
});
});
// Handle uncaught exceptions
process.on('uncaughtException', (err) => {
console.error('Uncaught Exception:', err);
udpServer.close();
server.close();
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
udpServer.close();
server.close();
process.exit(1);
});