feat: live updates
This commit is contained in:
392
index.js
392
index.js
@@ -4,6 +4,7 @@ const fs = require('fs');
|
||||
const dgram = require('dgram');
|
||||
const SporeApiClient = require('./src/client');
|
||||
const cors = require('cors');
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const app = express();
|
||||
const PORT = process.env.PORT || 3001;
|
||||
@@ -31,6 +32,7 @@ app.use(cors({
|
||||
// UDP discovery configuration
|
||||
const UDP_PORT = 4210;
|
||||
const DISCOVERY_MESSAGE = 'CLUSTER_DISCOVERY';
|
||||
const STALE_THRESHOLD_SECONDS = 3; // 3 seconds for faster detection
|
||||
|
||||
// Initialize UDP server for auto discovery
|
||||
const udpServer = dgram.createSocket('udp4');
|
||||
@@ -55,34 +57,106 @@ udpServer.on('message', (msg, rinfo) => {
|
||||
const sourceIp = rinfo.address;
|
||||
const sourcePort = rinfo.port;
|
||||
|
||||
//console.log(`UDP message received from ${sourceIp}:${sourcePort}: "${message}"`);
|
||||
// Only log non-discovery messages to reduce noise
|
||||
if (message !== DISCOVERY_MESSAGE) {
|
||||
console.log(`📨 UDP message received from ${sourceIp}:${sourcePort}: "${message}"`);
|
||||
}
|
||||
|
||||
if (message === DISCOVERY_MESSAGE) {
|
||||
//console.log(`Received CLUSTER_DISCOVERY from ${sourceIp}:${sourcePort}`);
|
||||
|
||||
|
||||
// Store the discovered node
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const nodeInfo = {
|
||||
ip: sourceIp,
|
||||
port: sourcePort,
|
||||
discoveredAt: new Date(),
|
||||
lastSeen: new Date()
|
||||
status: 'active', // New nodes from discovery are active
|
||||
discoveredAt: now,
|
||||
lastSeen: now
|
||||
};
|
||||
|
||||
|
||||
const isNewNode = !discoveredNodes.has(sourceIp);
|
||||
discoveredNodes.set(sourceIp, nodeInfo);
|
||||
|
||||
|
||||
// Set as primary node if this is the first one or if we don't have one
|
||||
if (!primaryNodeIp) {
|
||||
primaryNodeIp = sourceIp;
|
||||
console.log(`Set primary node to ${sourceIp}`);
|
||||
|
||||
|
||||
// Immediately try to initialize the client
|
||||
updateSporeClient();
|
||||
}
|
||||
|
||||
|
||||
// Update last seen timestamp
|
||||
discoveredNodes.get(sourceIp).lastSeen = new Date();
|
||||
|
||||
//console.log(`Node ${sourceIp} added/updated. Total discovered nodes: ${discoveredNodes.size}`);
|
||||
discoveredNodes.get(sourceIp).lastSeen = Math.floor(Date.now() / 1000);
|
||||
|
||||
// Broadcast discovery event if this is a new node
|
||||
if (isNewNode) {
|
||||
console.log(`🆕 NEW NODE DISCOVERED: ${sourceIp}:${sourcePort} via CLUSTER_DISCOVERY. Total nodes: ${discoveredNodes.size}`);
|
||||
broadcastNodeDiscovery(sourceIp, 'discovered');
|
||||
// Broadcast cluster update after a short delay to allow member data to be fetched
|
||||
setTimeout(() => broadcastMemberListChange('new discovery'), 1000);
|
||||
}
|
||||
} else if (message.startsWith('CLUSTER_HEARTBEAT:')) {
|
||||
// Handle heartbeat messages that also update member list
|
||||
const hostname = message.substring('CLUSTER_HEARTBEAT:'.length);
|
||||
// Update or create node entry from heartbeat
|
||||
const existingNode = discoveredNodes.get(sourceIp);
|
||||
const now = Math.floor(Date.now() / 1000); // Use Unix timestamp for consistency
|
||||
|
||||
if (existingNode) {
|
||||
// Update existing node
|
||||
const wasStale = existingNode.status === 'inactive';
|
||||
const oldHostname = existingNode.hostname;
|
||||
|
||||
existingNode.lastSeen = now;
|
||||
existingNode.hostname = hostname;
|
||||
existingNode.status = 'active'; // Mark as active when heartbeat received
|
||||
|
||||
console.log(`💓 Heartbeat from ${sourceIp}:${sourcePort} (${hostname}). Total nodes: ${discoveredNodes.size}`);
|
||||
|
||||
// Check if hostname changed
|
||||
const hostnameChanged = oldHostname !== hostname;
|
||||
if (hostnameChanged) {
|
||||
console.log(`🔄 Hostname updated for ${sourceIp}: "${oldHostname}" -> "${hostname}"`);
|
||||
}
|
||||
|
||||
// ALWAYS broadcast every heartbeat for immediate UI updates
|
||||
// This ensures the UI gets real-time updates without delay
|
||||
const reason = wasStale ? 'node became active' :
|
||||
hostnameChanged ? 'hostname update' :
|
||||
'active heartbeat';
|
||||
|
||||
console.log(`📡 Broadcasting heartbeat update: ${reason}`);
|
||||
broadcastMemberListChange(reason);
|
||||
} else {
|
||||
// Create new node entry from heartbeat - NEW NODE DISCOVERED
|
||||
const nodeInfo = {
|
||||
ip: sourceIp,
|
||||
port: sourcePort,
|
||||
hostname: hostname,
|
||||
status: 'active', // New nodes from heartbeat are active
|
||||
discoveredAt: now,
|
||||
lastSeen: now
|
||||
};
|
||||
discoveredNodes.set(sourceIp, nodeInfo);
|
||||
|
||||
console.log(`🆕 NEW NODE DISCOVERED: ${sourceIp}:${sourcePort} (${hostname}) via heartbeat. Total nodes: ${discoveredNodes.size}`);
|
||||
|
||||
// Set as primary node if this is the first one or if we don't have one
|
||||
if (!primaryNodeIp) {
|
||||
primaryNodeIp = sourceIp;
|
||||
console.log(`Set primary node to ${sourceIp} (from heartbeat)`);
|
||||
|
||||
// Immediately try to initialize the client
|
||||
updateSporeClient();
|
||||
}
|
||||
|
||||
// Broadcast discovery event for new node from heartbeat
|
||||
broadcastNodeDiscovery(sourceIp, 'discovered');
|
||||
// Broadcast cluster update after a short delay to allow member data to be fetched
|
||||
setTimeout(() => broadcastMemberListChange('new heartbeat discovery'), 1000);
|
||||
}
|
||||
} else if (!message.startsWith('RAW:')) {
|
||||
console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`);
|
||||
}
|
||||
@@ -121,23 +195,46 @@ function initializeSporeClient(nodeIp) {
|
||||
}
|
||||
}
|
||||
|
||||
// Function to clean up stale discovered nodes (nodes not seen in the last 5 minutes)
|
||||
function cleanupStaleNodes() {
|
||||
const now = new Date();
|
||||
const staleThreshold = 5 * 60 * 1000; // 5 minutes in milliseconds
|
||||
|
||||
// Function to mark stale nodes as inactive (instead of removing them)
|
||||
function markStaleNodes() {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
|
||||
let nodesMarkedStale = false;
|
||||
|
||||
for (const [ip, node] of discoveredNodes.entries()) {
|
||||
if (now - node.lastSeen > staleThreshold) {
|
||||
console.log(`Removing stale node: ${ip} (last seen: ${node.lastSeen.toISOString()})`);
|
||||
discoveredNodes.delete(ip);
|
||||
|
||||
// If this was our primary node, clear it
|
||||
const timeSinceLastSeen = now - node.lastSeen;
|
||||
|
||||
if (timeSinceLastSeen > STALE_THRESHOLD_SECONDS && node.status !== 'inactive') {
|
||||
console.log(`💀 NODE MARKED INACTIVE: ${ip} (${node.hostname || 'Unknown'}) - last seen ${timeSinceLastSeen}s ago (threshold: ${STALE_THRESHOLD_SECONDS}s)`);
|
||||
node.status = 'inactive';
|
||||
nodesMarkedStale = true;
|
||||
|
||||
// Broadcast stale node event immediately
|
||||
console.log(`📡 Broadcasting stale node event for ${ip}`);
|
||||
broadcastNodeDiscovery(ip, 'stale');
|
||||
|
||||
// If this was our primary node, clear it and select a new one
|
||||
if (primaryNodeIp === ip) {
|
||||
primaryNodeIp = null;
|
||||
console.log('Primary node became stale, clearing primary node selection');
|
||||
console.log('🚫 PRIMARY NODE BECAME STALE: Clearing primary node selection');
|
||||
|
||||
// Automatically select a new primary node from remaining healthy nodes
|
||||
const newPrimary = selectBestPrimaryNode();
|
||||
if (newPrimary) {
|
||||
console.log(`✅ NEW PRIMARY NODE SELECTED: ${newPrimary} (auto-selected after stale cleanup)`);
|
||||
// Update the SPORE client to use the new primary node
|
||||
updateSporeClient();
|
||||
} else {
|
||||
console.log('⚠️ No healthy nodes available for primary selection');
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast cluster update if any nodes were marked stale
|
||||
if (nodesMarkedStale) {
|
||||
broadcastMemberListChange('nodes marked stale');
|
||||
}
|
||||
}
|
||||
|
||||
// Function to select the best primary node
|
||||
@@ -165,6 +262,7 @@ function selectBestPrimaryNode() {
|
||||
if (bestNode && bestNode !== primaryNodeIp) {
|
||||
primaryNodeIp = bestNode;
|
||||
console.log(`Selected new primary node: ${bestNode}`);
|
||||
broadcastMemberListChange('primary node change');
|
||||
}
|
||||
|
||||
return bestNode;
|
||||
@@ -191,7 +289,8 @@ function selectRandomPrimaryNode() {
|
||||
// Update primary node
|
||||
primaryNodeIp = randomNode;
|
||||
console.log(`Randomly selected new primary node: ${randomNode}`);
|
||||
|
||||
broadcastMemberListChange('random primary node selection');
|
||||
|
||||
return randomNode;
|
||||
}
|
||||
|
||||
@@ -234,6 +333,7 @@ async function performWithFailover(operation) {
|
||||
primaryNodeIp = ip;
|
||||
sporeClient = client;
|
||||
console.log(`Failover: switched primary node to ${ip}`);
|
||||
broadcastMemberListChange('failover primary node switch');
|
||||
}
|
||||
return result;
|
||||
} catch (err) {
|
||||
@@ -248,11 +348,11 @@ async function performWithFailover(operation) {
|
||||
|
||||
// Set up periodic tasks
|
||||
setInterval(() => {
|
||||
cleanupStaleNodes();
|
||||
markStaleNodes();
|
||||
if (!sporeClient || !primaryNodeIp || !discoveredNodes.has(primaryNodeIp)) {
|
||||
updateSporeClient();
|
||||
}
|
||||
}, 5000); // Check every 5 seconds
|
||||
}, 2000); // Check every 2 seconds for faster stale detection
|
||||
|
||||
// Serve static files from public directory
|
||||
app.use(express.static(path.join(__dirname, 'public')));
|
||||
@@ -266,8 +366,8 @@ app.get('/', (req, res) => {
|
||||
app.get('/api/discovery/nodes', (req, res) => {
|
||||
const nodes = Array.from(discoveredNodes.values()).map(node => ({
|
||||
...node,
|
||||
discoveredAt: node.discoveredAt.toISOString(),
|
||||
lastSeen: node.lastSeen.toISOString(),
|
||||
discoveredAt: new Date(node.discoveredAt * 1000).toISOString(),
|
||||
lastSeen: new Date(node.lastSeen * 1000).toISOString(),
|
||||
isPrimary: node.ip === primaryNodeIp
|
||||
}));
|
||||
|
||||
@@ -288,12 +388,15 @@ app.get('/api/discovery/nodes', (req, res) => {
|
||||
// API endpoint to manually trigger discovery refresh
|
||||
app.post('/api/discovery/refresh', (req, res) => {
|
||||
try {
|
||||
// Clean up stale nodes
|
||||
cleanupStaleNodes();
|
||||
|
||||
// Mark stale nodes as inactive
|
||||
markStaleNodes();
|
||||
|
||||
// Try to update the client
|
||||
updateSporeClient();
|
||||
|
||||
|
||||
// Broadcast cluster update via WebSocket
|
||||
broadcastMemberListChange('manual refresh');
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: 'Discovery refresh completed',
|
||||
@@ -310,6 +413,27 @@ app.post('/api/discovery/refresh', (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
// API endpoint to test WebSocket broadcasting
|
||||
app.post('/api/test/websocket', (req, res) => {
|
||||
try {
|
||||
console.log('🧪 Manual WebSocket test triggered');
|
||||
broadcastMemberListChange('manual test');
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: 'WebSocket test broadcast sent',
|
||||
websocketClients: wsClients.size,
|
||||
totalNodes: discoveredNodes.size
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error during WebSocket test:', error);
|
||||
res.status(500).json({
|
||||
error: 'WebSocket test failed',
|
||||
message: error.message
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// API endpoint to randomly select a new primary node
|
||||
app.post('/api/discovery/random-primary', (req, res) => {
|
||||
try {
|
||||
@@ -367,6 +491,7 @@ app.post('/api/discovery/primary/:ip', (req, res) => {
|
||||
|
||||
primaryNodeIp = requestedIp;
|
||||
updateSporeClient();
|
||||
broadcastMemberListChange('manual primary node setting');
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
@@ -721,11 +846,216 @@ app.get('/api/health', (req, res) => {
|
||||
|
||||
|
||||
|
||||
// WebSocket server setup - will be initialized after HTTP server
|
||||
let wss = null;
|
||||
const wsClients = new Set();
|
||||
|
||||
// Function to broadcast cluster updates to all connected WebSocket clients
|
||||
function broadcastClusterUpdate() {
|
||||
if (wsClients.size === 0 || !wss) return;
|
||||
|
||||
const startTime = Date.now();
|
||||
console.log(`📡 [${new Date().toISOString()}] Starting cluster update broadcast to ${wsClients.size} clients`);
|
||||
|
||||
// Get cluster members asynchronously
|
||||
getCurrentClusterMembers().then(members => {
|
||||
const clusterData = {
|
||||
type: 'cluster_update',
|
||||
members: members,
|
||||
primaryNode: primaryNodeIp,
|
||||
totalNodes: discoveredNodes.size,
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
const message = JSON.stringify(clusterData);
|
||||
const broadcastTime = Date.now() - startTime;
|
||||
console.log(`📡 [${new Date().toISOString()}] Broadcasting cluster update to ${wsClients.size} WebSocket clients (took ${broadcastTime}ms)`);
|
||||
console.log(`📊 Cluster data: ${members.length} members, primary: ${primaryNodeIp || 'none'}`);
|
||||
|
||||
wsClients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(message);
|
||||
}
|
||||
});
|
||||
}).catch(error => {
|
||||
console.error('Error broadcasting cluster update:', error);
|
||||
});
|
||||
}
|
||||
|
||||
// Function to broadcast node discovery events
|
||||
function broadcastNodeDiscovery(nodeIp, action) {
|
||||
if (wsClients.size === 0 || !wss) return;
|
||||
|
||||
const eventData = {
|
||||
type: 'node_discovery',
|
||||
action: action, // 'discovered' or 'stale'
|
||||
nodeIp: nodeIp,
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
const message = JSON.stringify(eventData);
|
||||
wsClients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Helper function to broadcast member list changes
|
||||
function broadcastMemberListChange(reason = 'update') {
|
||||
const timestamp = new Date().toISOString();
|
||||
console.log(`🔄 [${timestamp}] Member list changed (${reason}), broadcasting update`);
|
||||
broadcastClusterUpdate();
|
||||
}
|
||||
|
||||
// Helper function to get current cluster members (async version)
|
||||
async function getCurrentClusterMembers() {
|
||||
try {
|
||||
if (discoveredNodes.size === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Fetch real cluster data from SPORE nodes for accurate information
|
||||
console.log(`📡 Fetching real cluster data from ${discoveredNodes.size} nodes for WebSocket broadcast`);
|
||||
const clusterResponse = await performWithFailover((client) => client.getClusterStatus());
|
||||
const apiMembers = clusterResponse.members || [];
|
||||
|
||||
// Update our local discoveredNodes with fresh information from the API
|
||||
let updatedNodes = false;
|
||||
apiMembers.forEach(apiMember => {
|
||||
const localNode = discoveredNodes.get(apiMember.ip);
|
||||
if (localNode) {
|
||||
// Update local node with fresh API data
|
||||
const needsUpdate =
|
||||
localNode.hostname !== apiMember.hostname ||
|
||||
localNode.status !== apiMember.status ||
|
||||
localNode.latency !== apiMember.latency ||
|
||||
JSON.stringify(localNode.labels) !== JSON.stringify(apiMember.labels);
|
||||
|
||||
if (needsUpdate) {
|
||||
console.log(`🔄 Updating local node ${apiMember.ip} with fresh API data`);
|
||||
localNode.hostname = apiMember.hostname;
|
||||
localNode.status = apiMember.status;
|
||||
localNode.latency = apiMember.latency;
|
||||
localNode.labels = apiMember.labels || {};
|
||||
localNode.lastSeen = Math.floor(Date.now() / 1000);
|
||||
updatedNodes = true;
|
||||
}
|
||||
} else {
|
||||
// New node discovered via API - shouldn't happen but handle it
|
||||
console.log(`🆕 New node discovered via API: ${apiMember.ip}`);
|
||||
discoveredNodes.set(apiMember.ip, {
|
||||
ip: apiMember.ip,
|
||||
hostname: apiMember.hostname,
|
||||
status: apiMember.status,
|
||||
latency: apiMember.latency,
|
||||
labels: apiMember.labels || {},
|
||||
discoveredAt: new Date(),
|
||||
lastSeen: Math.floor(Date.now() / 1000)
|
||||
});
|
||||
updatedNodes = true;
|
||||
}
|
||||
});
|
||||
|
||||
// If we updated any nodes, broadcast the changes
|
||||
if (updatedNodes) {
|
||||
console.log(`📡 Local node data updated, triggering immediate broadcast`);
|
||||
// Note: We don't call broadcastMemberListChange here because we're already in the middle of a broadcast
|
||||
// The calling function will handle the broadcast
|
||||
}
|
||||
|
||||
// Enhance API data with our local status information
|
||||
const enhancedMembers = apiMembers.map(apiMember => {
|
||||
const localNode = discoveredNodes.get(apiMember.ip);
|
||||
if (localNode) {
|
||||
// Use our local status (which may be 'inactive' if the node became stale)
|
||||
return {
|
||||
...apiMember,
|
||||
status: localNode.status || apiMember.status,
|
||||
hostname: localNode.hostname || apiMember.hostname,
|
||||
lastSeen: localNode.lastSeen || apiMember.lastSeen,
|
||||
labels: localNode.labels || apiMember.labels || {},
|
||||
resources: localNode.resources || apiMember.resources || {}
|
||||
};
|
||||
}
|
||||
return apiMember;
|
||||
});
|
||||
|
||||
console.log(`📊 Returning ${enhancedMembers.length} enhanced cluster members via WebSocket`);
|
||||
return enhancedMembers;
|
||||
} catch (error) {
|
||||
console.error('Error getting cluster members for WebSocket:', error);
|
||||
|
||||
// Fallback to local data if API fails
|
||||
console.log('⚠️ API failed, falling back to local discoveredNodes data');
|
||||
const fallbackMembers = Array.from(discoveredNodes.values()).map(node => ({
|
||||
ip: node.ip,
|
||||
hostname: node.hostname || 'Unknown Device',
|
||||
status: node.status || 'active', // Use stored status (may be 'inactive')
|
||||
latency: node.latency || 0,
|
||||
lastSeen: node.lastSeen || Math.floor(Date.now() / 1000),
|
||||
labels: node.labels || {},
|
||||
resources: node.resources || {}
|
||||
}));
|
||||
|
||||
console.log(`📊 Fallback: Returning ${fallbackMembers.length} local cluster members`);
|
||||
return fallbackMembers;
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize WebSocket server after HTTP server is created
|
||||
function initializeWebSocketServer(httpServer) {
|
||||
wss = new WebSocket.Server({ server: httpServer });
|
||||
|
||||
// WebSocket connection handler
|
||||
wss.on('connection', (ws) => {
|
||||
console.log('WebSocket client connected');
|
||||
wsClients.add(ws);
|
||||
|
||||
// Send current cluster state to newly connected client
|
||||
if (discoveredNodes.size > 0) {
|
||||
// Get cluster members asynchronously without blocking
|
||||
getCurrentClusterMembers().then(members => {
|
||||
const clusterData = {
|
||||
type: 'cluster_update',
|
||||
members: members,
|
||||
primaryNode: primaryNodeIp,
|
||||
totalNodes: discoveredNodes.size,
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
console.log(`🔌 Sending initial cluster state to new WebSocket client: ${members.length} members`);
|
||||
ws.send(JSON.stringify(clusterData));
|
||||
}).catch(error => {
|
||||
console.error('Error sending initial cluster state:', error);
|
||||
});
|
||||
}
|
||||
|
||||
// Handle client disconnection
|
||||
ws.on('close', () => {
|
||||
console.log('WebSocket client disconnected');
|
||||
wsClients.delete(ws);
|
||||
});
|
||||
|
||||
// Handle WebSocket errors
|
||||
ws.on('error', (error) => {
|
||||
console.error('WebSocket error:', error);
|
||||
wsClients.delete(ws);
|
||||
});
|
||||
});
|
||||
|
||||
console.log('WebSocket server initialized');
|
||||
}
|
||||
|
||||
// Start the server
|
||||
const server = app.listen(PORT, '0.0.0.0', () => {
|
||||
console.log(`Server is running on http://0.0.0.0:${PORT}`);
|
||||
console.log(`Accessible from: http://YOUR_COMPUTER_IP:${PORT}`);
|
||||
console.log(`UDP discovery server listening on port ${UDP_PORT}`);
|
||||
|
||||
// Initialize WebSocket server after HTTP server is running
|
||||
initializeWebSocketServer(server);
|
||||
console.log('WebSocket server ready for real-time updates');
|
||||
|
||||
console.log('Waiting for CLUSTER_DISCOVERY messages from SPORE nodes...');
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user